GNU Linux-libre 4.14.302-gnu1
[releases.git] / drivers / staging / lustre / lustre / lov / lov_obd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/lov/lov_obd.c
33  *
34  * Author: Phil Schwan <phil@clusterfs.com>
35  * Author: Peter Braam <braam@clusterfs.com>
36  * Author: Mike Shaver <shaver@clusterfs.com>
37  * Author: Nathan Rutman <nathan@clusterfs.com>
38  */
39
40 #define DEBUG_SUBSYSTEM S_LOV
41 #include <linux/libcfs/libcfs.h>
42
43 #include <uapi/linux/lustre/lustre_idl.h>
44 #include <uapi/linux/lustre/lustre_ioctl.h>
45
46 #include <cl_object.h>
47 #include <lustre_dlm.h>
48 #include <lustre_fid.h>
49 #include <lustre_lib.h>
50 #include <lustre_mds.h>
51 #include <lustre_net.h>
52 #include <uapi/linux/lustre/lustre_param.h>
53 #include <lustre_swab.h>
54 #include <lprocfs_status.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57
58 #include "lov_internal.h"
59
60 /* Keep a refcount of lov->tgt usage to prevent racing with addition/deletion.
61  * Any function that expects lov_tgts to remain stationary must take a ref.
62  */
63 static void lov_getref(struct obd_device *obd)
64 {
65         struct lov_obd *lov = &obd->u.lov;
66
67         /* nobody gets through here until lov_putref is done */
68         mutex_lock(&lov->lov_lock);
69         atomic_inc(&lov->lov_refcount);
70         mutex_unlock(&lov->lov_lock);
71 }
72
73 static void __lov_del_obd(struct obd_device *obd, struct lov_tgt_desc *tgt);
74
75 static void lov_putref(struct obd_device *obd)
76 {
77         struct lov_obd *lov = &obd->u.lov;
78
79         mutex_lock(&lov->lov_lock);
80         /* ok to dec to 0 more than once -- ltd_exp's will be null */
81         if (atomic_dec_and_test(&lov->lov_refcount) && lov->lov_death_row) {
82                 LIST_HEAD(kill);
83                 int i;
84                 struct lov_tgt_desc *tgt, *n;
85
86                 CDEBUG(D_CONFIG, "destroying %d lov targets\n",
87                        lov->lov_death_row);
88                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
89                         tgt = lov->lov_tgts[i];
90
91                         if (!tgt || !tgt->ltd_reap)
92                                 continue;
93                         list_add(&tgt->ltd_kill, &kill);
94                         /* XXX - right now there is a dependency on ld_tgt_count
95                          * being the maximum tgt index for computing the
96                          * mds_max_easize. So we can't shrink it.
97                          */
98                         lov_ost_pool_remove(&lov->lov_packed, i);
99                         lov->lov_tgts[i] = NULL;
100                         lov->lov_death_row--;
101                 }
102                 mutex_unlock(&lov->lov_lock);
103
104                 list_for_each_entry_safe(tgt, n, &kill, ltd_kill) {
105                         list_del(&tgt->ltd_kill);
106                         /* Disconnect */
107                         __lov_del_obd(obd, tgt);
108                 }
109
110                 if (lov->lov_tgts_kobj)
111                         kobject_put(lov->lov_tgts_kobj);
112
113         } else {
114                 mutex_unlock(&lov->lov_lock);
115         }
116 }
117
118 static int lov_set_osc_active(struct obd_device *obd, struct obd_uuid *uuid,
119                               enum obd_notify_event ev);
120 static int lov_notify(struct obd_device *obd, struct obd_device *watched,
121                       enum obd_notify_event ev, void *data);
122
123 int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
124                     struct obd_connect_data *data)
125 {
126         struct lov_obd *lov = &obd->u.lov;
127         struct obd_uuid *tgt_uuid;
128         struct obd_device *tgt_obd;
129         static struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
130         struct obd_import *imp;
131         int rc;
132
133         if (!lov->lov_tgts[index])
134                 return -EINVAL;
135
136         tgt_uuid = &lov->lov_tgts[index]->ltd_uuid;
137         tgt_obd = lov->lov_tgts[index]->ltd_obd;
138
139         if (!tgt_obd->obd_set_up) {
140                 CERROR("Target %s not set up\n", obd_uuid2str(tgt_uuid));
141                 return -EINVAL;
142         }
143
144         /* override the sp_me from lov */
145         tgt_obd->u.cli.cl_sp_me = lov->lov_sp_me;
146
147         if (data && (data->ocd_connect_flags & OBD_CONNECT_INDEX))
148                 data->ocd_index = index;
149
150         /*
151          * Divine LOV knows that OBDs under it are OSCs.
152          */
153         imp = tgt_obd->u.cli.cl_import;
154
155         if (activate) {
156                 tgt_obd->obd_no_recov = 0;
157                 /* FIXME this is probably supposed to be
158                  * ptlrpc_set_import_active.  Horrible naming.
159                  */
160                 ptlrpc_activate_import(imp);
161         }
162
163         rc = obd_register_observer(tgt_obd, obd);
164         if (rc) {
165                 CERROR("Target %s register_observer error %d\n",
166                        obd_uuid2str(tgt_uuid), rc);
167                 return rc;
168         }
169
170         if (imp->imp_invalid) {
171                 CDEBUG(D_CONFIG, "not connecting OSC %s; administratively disabled\n",
172                        obd_uuid2str(tgt_uuid));
173                 return 0;
174         }
175
176         rc = obd_connect(NULL, &lov->lov_tgts[index]->ltd_exp, tgt_obd,
177                          &lov_osc_uuid, data, NULL);
178         if (rc || !lov->lov_tgts[index]->ltd_exp) {
179                 CERROR("Target %s connect error %d\n",
180                        obd_uuid2str(tgt_uuid), rc);
181                 return -ENODEV;
182         }
183
184         lov->lov_tgts[index]->ltd_reap = 0;
185
186         CDEBUG(D_CONFIG, "Connected tgt idx %d %s (%s) %sactive\n", index,
187                obd_uuid2str(tgt_uuid), tgt_obd->obd_name, activate ? "":"in");
188
189         if (lov->lov_tgts_kobj)
190                 /* Even if we failed, that's ok */
191                 rc = sysfs_create_link(lov->lov_tgts_kobj, &tgt_obd->obd_kobj,
192                                        tgt_obd->obd_name);
193
194         return 0;
195 }
196
197 static int lov_connect(const struct lu_env *env,
198                        struct obd_export **exp, struct obd_device *obd,
199                        struct obd_uuid *cluuid, struct obd_connect_data *data,
200                        void *localdata)
201 {
202         struct lov_obd *lov = &obd->u.lov;
203         struct lov_tgt_desc *tgt;
204         struct lustre_handle conn;
205         int i, rc;
206
207         CDEBUG(D_CONFIG, "connect #%d\n", lov->lov_connects);
208
209         rc = class_connect(&conn, obd, cluuid);
210         if (rc)
211                 return rc;
212
213         *exp = class_conn2export(&conn);
214
215         /* Why should there ever be more than 1 connect? */
216         lov->lov_connects++;
217         LASSERT(lov->lov_connects == 1);
218
219         memset(&lov->lov_ocd, 0, sizeof(lov->lov_ocd));
220         if (data)
221                 lov->lov_ocd = *data;
222
223         obd_getref(obd);
224
225         lov->lov_tgts_kobj = kobject_create_and_add("target_obds",
226                                                     &obd->obd_kobj);
227
228         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
229                 tgt = lov->lov_tgts[i];
230                 if (!tgt || obd_uuid_empty(&tgt->ltd_uuid))
231                         continue;
232                 /* Flags will be lowest common denominator */
233                 rc = lov_connect_obd(obd, i, tgt->ltd_activate, &lov->lov_ocd);
234                 if (rc) {
235                         CERROR("%s: lov connect tgt %d failed: %d\n",
236                                obd->obd_name, i, rc);
237                         continue;
238                 }
239                 /* connect to administrative disabled ost */
240                 if (!lov->lov_tgts[i]->ltd_exp)
241                         continue;
242
243                 rc = lov_notify(obd, lov->lov_tgts[i]->ltd_exp->exp_obd,
244                                 OBD_NOTIFY_CONNECT, (void *)&i);
245                 if (rc) {
246                         CERROR("%s error sending notify %d\n",
247                                obd->obd_name, rc);
248                 }
249         }
250         obd_putref(obd);
251
252         return 0;
253 }
254
255 static int lov_disconnect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt)
256 {
257         struct lov_obd *lov = &obd->u.lov;
258         struct obd_device *osc_obd;
259         int rc;
260
261         osc_obd = class_exp2obd(tgt->ltd_exp);
262         CDEBUG(D_CONFIG, "%s: disconnecting target %s\n",
263                obd->obd_name, osc_obd ? osc_obd->obd_name : "NULL");
264
265         if (tgt->ltd_active) {
266                 tgt->ltd_active = 0;
267                 lov->desc.ld_active_tgt_count--;
268                 tgt->ltd_exp->exp_obd->obd_inactive = 1;
269         }
270
271         if (osc_obd) {
272                 if (lov->lov_tgts_kobj)
273                         sysfs_remove_link(lov->lov_tgts_kobj,
274                                           osc_obd->obd_name);
275
276                 /* Pass it on to our clients.
277                  * XXX This should be an argument to disconnect,
278                  * XXX not a back-door flag on the OBD.  Ah well.
279                  */
280                 osc_obd->obd_force = obd->obd_force;
281                 osc_obd->obd_fail = obd->obd_fail;
282                 osc_obd->obd_no_recov = obd->obd_no_recov;
283         }
284
285         obd_register_observer(osc_obd, NULL);
286
287         rc = obd_disconnect(tgt->ltd_exp);
288         if (rc) {
289                 CERROR("Target %s disconnect error %d\n",
290                        tgt->ltd_uuid.uuid, rc);
291                 rc = 0;
292         }
293
294         tgt->ltd_exp = NULL;
295         return 0;
296 }
297
298 static int lov_disconnect(struct obd_export *exp)
299 {
300         struct obd_device *obd = class_exp2obd(exp);
301         struct lov_obd *lov = &obd->u.lov;
302         int i, rc;
303
304         if (!lov->lov_tgts)
305                 goto out;
306
307         /* Only disconnect the underlying layers on the final disconnect. */
308         lov->lov_connects--;
309         if (lov->lov_connects != 0) {
310                 /* why should there be more than 1 connect? */
311                 CERROR("disconnect #%d\n", lov->lov_connects);
312                 goto out;
313         }
314
315         /* Let's hold another reference so lov_del_obd doesn't spin through
316          * putref every time
317          */
318         obd_getref(obd);
319
320         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
321                 if (lov->lov_tgts[i] && lov->lov_tgts[i]->ltd_exp) {
322                         /* Disconnection is the last we know about an obd */
323                         lov_del_target(obd, i, NULL, lov->lov_tgts[i]->ltd_gen);
324                 }
325         }
326
327         obd_putref(obd);
328
329 out:
330         rc = class_disconnect(exp); /* bz 9811 */
331         return rc;
332 }
333
334 /* Error codes:
335  *
336  *  -EINVAL  : UUID can't be found in the LOV's target list
337  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
338  *  -EBADF   : The UUID is found, but the OBD is the wrong type (!)
339  *  any >= 0 : is log target index
340  */
341 static int lov_set_osc_active(struct obd_device *obd, struct obd_uuid *uuid,
342                               enum obd_notify_event ev)
343 {
344         struct lov_obd *lov = &obd->u.lov;
345         struct lov_tgt_desc *tgt;
346         int index, activate, active;
347
348         CDEBUG(D_INFO, "Searching in lov %p for uuid %s event(%d)\n",
349                lov, uuid->uuid, ev);
350
351         obd_getref(obd);
352         for (index = 0; index < lov->desc.ld_tgt_count; index++) {
353                 tgt = lov->lov_tgts[index];
354                 if (!tgt)
355                         continue;
356                 /*
357                  * LU-642, initially inactive OSC could miss the obd_connect,
358                  * we make up for it here.
359                  */
360                 if (ev == OBD_NOTIFY_ACTIVATE && !tgt->ltd_exp &&
361                     obd_uuid_equals(uuid, &tgt->ltd_uuid)) {
362                         struct obd_uuid lov_osc_uuid = {"LOV_OSC_UUID"};
363
364                         obd_connect(NULL, &tgt->ltd_exp, tgt->ltd_obd,
365                                     &lov_osc_uuid, &lov->lov_ocd, NULL);
366                 }
367                 if (!tgt->ltd_exp)
368                         continue;
369
370                 CDEBUG(D_INFO, "lov idx %d is %s conn %#llx\n",
371                        index, obd_uuid2str(&tgt->ltd_uuid),
372                        tgt->ltd_exp->exp_handle.h_cookie);
373                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
374                         break;
375         }
376
377         if (index == lov->desc.ld_tgt_count) {
378                 index = -EINVAL;
379                 goto out;
380         }
381
382         if (ev == OBD_NOTIFY_DEACTIVATE || ev == OBD_NOTIFY_ACTIVATE) {
383                 activate = (ev == OBD_NOTIFY_ACTIVATE) ? 1 : 0;
384
385                 if (lov->lov_tgts[index]->ltd_activate == activate) {
386                         CDEBUG(D_INFO, "OSC %s already %sactivate!\n",
387                                uuid->uuid, activate ? "" : "de");
388                 } else {
389                         lov->lov_tgts[index]->ltd_activate = activate;
390                         CDEBUG(D_CONFIG, "%sactivate OSC %s\n",
391                                activate ? "" : "de", obd_uuid2str(uuid));
392                 }
393
394         } else if (ev == OBD_NOTIFY_INACTIVE || ev == OBD_NOTIFY_ACTIVE) {
395                 active = (ev == OBD_NOTIFY_ACTIVE) ? 1 : 0;
396
397                 if (lov->lov_tgts[index]->ltd_active == active) {
398                         CDEBUG(D_INFO, "OSC %s already %sactive!\n",
399                                uuid->uuid, active ? "" : "in");
400                         goto out;
401                 }
402                 CDEBUG(D_CONFIG, "Marking OSC %s %sactive\n",
403                        obd_uuid2str(uuid), active ? "" : "in");
404
405                 lov->lov_tgts[index]->ltd_active = active;
406                 if (active) {
407                         lov->desc.ld_active_tgt_count++;
408                         lov->lov_tgts[index]->ltd_exp->exp_obd->obd_inactive = 0;
409                 } else {
410                         lov->desc.ld_active_tgt_count--;
411                         lov->lov_tgts[index]->ltd_exp->exp_obd->obd_inactive = 1;
412                 }
413         } else {
414                 CERROR("Unknown event(%d) for uuid %s", ev, uuid->uuid);
415         }
416
417  out:
418         obd_putref(obd);
419         return index;
420 }
421
422 static int lov_notify(struct obd_device *obd, struct obd_device *watched,
423                       enum obd_notify_event ev, void *data)
424 {
425         int rc = 0;
426         struct lov_obd *lov = &obd->u.lov;
427
428         down_read(&lov->lov_notify_lock);
429         if (!lov->lov_connects) {
430                 up_read(&lov->lov_notify_lock);
431                 return rc;
432         }
433
434         if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE ||
435             ev == OBD_NOTIFY_ACTIVATE || ev == OBD_NOTIFY_DEACTIVATE) {
436                 struct obd_uuid *uuid;
437
438                 LASSERT(watched);
439
440                 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME)) {
441                         up_read(&lov->lov_notify_lock);
442                         CERROR("unexpected notification of %s %s!\n",
443                                watched->obd_type->typ_name,
444                                watched->obd_name);
445                         return -EINVAL;
446                 }
447                 uuid = &watched->u.cli.cl_target_uuid;
448
449                 /* Set OSC as active before notifying the observer, so the
450                  * observer can use the OSC normally.
451                  */
452                 rc = lov_set_osc_active(obd, uuid, ev);
453                 if (rc < 0) {
454                         up_read(&lov->lov_notify_lock);
455                         CERROR("event(%d) of %s failed: %d\n", ev,
456                                obd_uuid2str(uuid), rc);
457                         return rc;
458                 }
459                 /* active event should be pass lov target index as data */
460                 data = &rc;
461         }
462
463         /* Pass the notification up the chain. */
464         if (watched) {
465                 rc = obd_notify_observer(obd, watched, ev, data);
466         } else {
467                 /* NULL watched means all osc's in the lov (only for syncs) */
468                 /* sync event should be send lov idx as data */
469                 struct lov_obd *lov = &obd->u.lov;
470                 int i, is_sync;
471
472                 data = &i;
473                 is_sync = (ev == OBD_NOTIFY_SYNC) ||
474                           (ev == OBD_NOTIFY_SYNC_NONBLOCK);
475
476                 obd_getref(obd);
477                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
478                         if (!lov->lov_tgts[i])
479                                 continue;
480
481                         /* don't send sync event if target not
482                          * connected/activated
483                          */
484                         if (is_sync &&  !lov->lov_tgts[i]->ltd_active)
485                                 continue;
486
487                         rc = obd_notify_observer(obd, lov->lov_tgts[i]->ltd_obd,
488                                                  ev, data);
489                         if (rc) {
490                                 CERROR("%s: notify %s of %s failed %d\n",
491                                        obd->obd_name,
492                                        obd->obd_observer->obd_name,
493                                        lov->lov_tgts[i]->ltd_obd->obd_name,
494                                        rc);
495                         }
496                 }
497                 obd_putref(obd);
498         }
499
500         up_read(&lov->lov_notify_lock);
501         return rc;
502 }
503
504 static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
505                           __u32 index, int gen, int active)
506 {
507         struct lov_obd *lov = &obd->u.lov;
508         struct lov_tgt_desc *tgt;
509         struct obd_device *tgt_obd;
510         int rc;
511
512         CDEBUG(D_CONFIG, "uuid:%s idx:%d gen:%d active:%d\n",
513                uuidp->uuid, index, gen, active);
514
515         if (gen <= 0) {
516                 CERROR("request to add OBD %s with invalid generation: %d\n",
517                        uuidp->uuid, gen);
518                 return -EINVAL;
519         }
520
521         tgt_obd = class_find_client_obd(uuidp, LUSTRE_OSC_NAME,
522                                         &obd->obd_uuid);
523         if (!tgt_obd)
524                 return -EINVAL;
525
526         mutex_lock(&lov->lov_lock);
527
528         if ((index < lov->lov_tgt_size) && lov->lov_tgts[index]) {
529                 tgt = lov->lov_tgts[index];
530                 CERROR("UUID %s already assigned at LOV target index %d\n",
531                        obd_uuid2str(&tgt->ltd_uuid), index);
532                 mutex_unlock(&lov->lov_lock);
533                 return -EEXIST;
534         }
535
536         if (index >= lov->lov_tgt_size) {
537                 /* We need to reallocate the lov target array. */
538                 struct lov_tgt_desc **newtgts, **old = NULL;
539                 __u32 newsize, oldsize = 0;
540
541                 newsize = max_t(__u32, lov->lov_tgt_size, 2);
542                 while (newsize < index + 1)
543                         newsize <<= 1;
544                 newtgts = kcalloc(newsize, sizeof(*newtgts), GFP_NOFS);
545                 if (!newtgts) {
546                         mutex_unlock(&lov->lov_lock);
547                         return -ENOMEM;
548                 }
549
550                 if (lov->lov_tgt_size) {
551                         memcpy(newtgts, lov->lov_tgts, sizeof(*newtgts) *
552                                lov->lov_tgt_size);
553                         old = lov->lov_tgts;
554                         oldsize = lov->lov_tgt_size;
555                 }
556
557                 lov->lov_tgts = newtgts;
558                 lov->lov_tgt_size = newsize;
559                 smp_rmb();
560                 kfree(old);
561
562                 CDEBUG(D_CONFIG, "tgts: %p size: %d\n",
563                        lov->lov_tgts, lov->lov_tgt_size);
564         }
565
566         tgt = kzalloc(sizeof(*tgt), GFP_NOFS);
567         if (!tgt) {
568                 mutex_unlock(&lov->lov_lock);
569                 return -ENOMEM;
570         }
571
572         rc = lov_ost_pool_add(&lov->lov_packed, index, lov->lov_tgt_size);
573         if (rc) {
574                 mutex_unlock(&lov->lov_lock);
575                 kfree(tgt);
576                 return rc;
577         }
578
579         tgt->ltd_uuid = *uuidp;
580         tgt->ltd_obd = tgt_obd;
581         /* XXX - add a sanity check on the generation number. */
582         tgt->ltd_gen = gen;
583         tgt->ltd_index = index;
584         tgt->ltd_activate = active;
585         lov->lov_tgts[index] = tgt;
586         if (index >= lov->desc.ld_tgt_count)
587                 lov->desc.ld_tgt_count = index + 1;
588
589         mutex_unlock(&lov->lov_lock);
590
591         CDEBUG(D_CONFIG, "idx=%d ltd_gen=%d ld_tgt_count=%d\n",
592                index, tgt->ltd_gen, lov->desc.ld_tgt_count);
593
594         if (lov->lov_connects == 0) {
595                 /* lov_connect hasn't been called yet. We'll do the
596                  * lov_connect_obd on this target when that fn first runs,
597                  * because we don't know the connect flags yet.
598                  */
599                 return 0;
600         }
601
602         obd_getref(obd);
603
604         rc = lov_connect_obd(obd, index, active, &lov->lov_ocd);
605         if (rc)
606                 goto out;
607
608         /* connect to administrative disabled ost */
609         if (!tgt->ltd_exp) {
610                 rc = 0;
611                 goto out;
612         }
613
614         if (lov->lov_cache) {
615                 rc = obd_set_info_async(NULL, tgt->ltd_exp,
616                                         sizeof(KEY_CACHE_SET), KEY_CACHE_SET,
617                                         sizeof(struct cl_client_cache),
618                                         lov->lov_cache, NULL);
619                 if (rc < 0)
620                         goto out;
621         }
622
623         rc = lov_notify(obd, tgt->ltd_exp->exp_obd,
624                         active ? OBD_NOTIFY_CONNECT : OBD_NOTIFY_INACTIVE,
625                         (void *)&index);
626
627 out:
628         if (rc) {
629                 CERROR("add failed (%d), deleting %s\n", rc,
630                        obd_uuid2str(&tgt->ltd_uuid));
631                 lov_del_target(obd, index, NULL, 0);
632         }
633         obd_putref(obd);
634         return rc;
635 }
636
637 /* Schedule a target for deletion */
638 int lov_del_target(struct obd_device *obd, __u32 index,
639                    struct obd_uuid *uuidp, int gen)
640 {
641         struct lov_obd *lov = &obd->u.lov;
642         int count = lov->desc.ld_tgt_count;
643         int rc = 0;
644
645         if (index >= count) {
646                 CERROR("LOV target index %d >= number of LOV OBDs %d.\n",
647                        index, count);
648                 return -EINVAL;
649         }
650
651         /* to make sure there's no ongoing lov_notify() now */
652         down_write(&lov->lov_notify_lock);
653         obd_getref(obd);
654
655         if (!lov->lov_tgts[index]) {
656                 CERROR("LOV target at index %d is not setup.\n", index);
657                 rc = -EINVAL;
658                 goto out;
659         }
660
661         if (uuidp && !obd_uuid_equals(uuidp, &lov->lov_tgts[index]->ltd_uuid)) {
662                 CERROR("LOV target UUID %s at index %d doesn't match %s.\n",
663                        lov_uuid2str(lov, index), index,
664                        obd_uuid2str(uuidp));
665                 rc = -EINVAL;
666                 goto out;
667         }
668
669         CDEBUG(D_CONFIG, "uuid: %s idx: %d gen: %d exp: %p active: %d\n",
670                lov_uuid2str(lov, index), index,
671                lov->lov_tgts[index]->ltd_gen, lov->lov_tgts[index]->ltd_exp,
672                lov->lov_tgts[index]->ltd_active);
673
674         lov->lov_tgts[index]->ltd_reap = 1;
675         lov->lov_death_row++;
676         /* we really delete it from obd_putref */
677 out:
678         obd_putref(obd);
679         up_write(&lov->lov_notify_lock);
680
681         return rc;
682 }
683
684 static void __lov_del_obd(struct obd_device *obd, struct lov_tgt_desc *tgt)
685 {
686         struct obd_device *osc_obd;
687
688         LASSERT(tgt);
689         LASSERT(tgt->ltd_reap);
690
691         osc_obd = class_exp2obd(tgt->ltd_exp);
692
693         CDEBUG(D_CONFIG, "Removing tgt %s : %s\n",
694                tgt->ltd_uuid.uuid,
695                osc_obd ? osc_obd->obd_name : "<no obd>");
696
697         if (tgt->ltd_exp)
698                 lov_disconnect_obd(obd, tgt);
699
700         kfree(tgt);
701
702         /* Manual cleanup - no cleanup logs to clean up the osc's.  We must
703          * do it ourselves. And we can't do it from lov_cleanup,
704          * because we just lost our only reference to it.
705          */
706         if (osc_obd)
707                 class_manual_cleanup(osc_obd);
708 }
709
710 void lov_fix_desc_stripe_size(__u64 *val)
711 {
712         if (*val < LOV_MIN_STRIPE_SIZE) {
713                 if (*val != 0)
714                         LCONSOLE_INFO("Increasing default stripe size to minimum %u\n",
715                                       LOV_DESC_STRIPE_SIZE_DEFAULT);
716                 *val = LOV_DESC_STRIPE_SIZE_DEFAULT;
717         } else if (*val & (LOV_MIN_STRIPE_SIZE - 1)) {
718                 *val &= ~(LOV_MIN_STRIPE_SIZE - 1);
719                 LCONSOLE_WARN("Changing default stripe size to %llu (a multiple of %u)\n",
720                               *val, LOV_MIN_STRIPE_SIZE);
721         }
722 }
723
724 void lov_fix_desc_stripe_count(__u32 *val)
725 {
726         if (*val == 0)
727                 *val = 1;
728 }
729
730 void lov_fix_desc_pattern(__u32 *val)
731 {
732         /* from lov_setstripe */
733         if ((*val != 0) && (*val != LOV_PATTERN_RAID0)) {
734                 LCONSOLE_WARN("Unknown stripe pattern: %#x\n", *val);
735                 *val = 0;
736         }
737 }
738
739 void lov_fix_desc_qos_maxage(__u32 *val)
740 {
741         if (*val == 0)
742                 *val = LOV_DESC_QOS_MAXAGE_DEFAULT;
743 }
744
745 void lov_fix_desc(struct lov_desc *desc)
746 {
747         lov_fix_desc_stripe_size(&desc->ld_default_stripe_size);
748         lov_fix_desc_stripe_count(&desc->ld_default_stripe_count);
749         lov_fix_desc_pattern(&desc->ld_pattern);
750         lov_fix_desc_qos_maxage(&desc->ld_qos_maxage);
751 }
752
753 int lov_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
754 {
755         struct lprocfs_static_vars lvars = { NULL };
756         struct lov_desc *desc;
757         struct lov_obd *lov = &obd->u.lov;
758         int rc;
759
760         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
761                 CERROR("LOV setup requires a descriptor\n");
762                 return -EINVAL;
763         }
764
765         desc = (struct lov_desc *)lustre_cfg_buf(lcfg, 1);
766
767         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
768                 CERROR("descriptor size wrong: %d > %d\n",
769                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
770                 return -EINVAL;
771         }
772
773         if (desc->ld_magic != LOV_DESC_MAGIC) {
774                 if (desc->ld_magic == __swab32(LOV_DESC_MAGIC)) {
775                         CDEBUG(D_OTHER, "%s: Swabbing lov desc %p\n",
776                                obd->obd_name, desc);
777                         lustre_swab_lov_desc(desc);
778                 } else {
779                         CERROR("%s: Bad lov desc magic: %#x\n",
780                                obd->obd_name, desc->ld_magic);
781                         return -EINVAL;
782                 }
783         }
784
785         lov_fix_desc(desc);
786
787         desc->ld_active_tgt_count = 0;
788         lov->desc = *desc;
789         lov->lov_tgt_size = 0;
790
791         mutex_init(&lov->lov_lock);
792         atomic_set(&lov->lov_refcount, 0);
793         lov->lov_sp_me = LUSTRE_SP_CLI;
794
795         init_rwsem(&lov->lov_notify_lock);
796
797         lov->lov_pools_hash_body = cfs_hash_create("POOLS", HASH_POOLS_CUR_BITS,
798                                                    HASH_POOLS_MAX_BITS,
799                                                    HASH_POOLS_BKT_BITS, 0,
800                                                    CFS_HASH_MIN_THETA,
801                                                    CFS_HASH_MAX_THETA,
802                                                    &pool_hash_operations,
803                                                    CFS_HASH_DEFAULT);
804         INIT_LIST_HEAD(&lov->lov_pool_list);
805         lov->lov_pool_count = 0;
806         rc = lov_ost_pool_init(&lov->lov_packed, 0);
807         if (rc)
808                 goto out;
809
810         lprocfs_lov_init_vars(&lvars);
811         lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars);
812
813         rc = ldebugfs_seq_create(obd->obd_debugfs_entry, "target_obd",
814                                  0444, &lov_proc_target_fops, obd);
815         if (rc)
816                 CWARN("Error adding the target_obd file\n");
817
818         lov->lov_pool_debugfs_entry = ldebugfs_register("pools",
819                                                      obd->obd_debugfs_entry,
820                                                      NULL, NULL);
821         return 0;
822
823 out:
824         return rc;
825 }
826
827 static int lov_cleanup(struct obd_device *obd)
828 {
829         struct lov_obd *lov = &obd->u.lov;
830         struct list_head *pos, *tmp;
831         struct pool_desc *pool;
832
833         list_for_each_safe(pos, tmp, &lov->lov_pool_list) {
834                 pool = list_entry(pos, struct pool_desc, pool_list);
835                 /* free pool structs */
836                 CDEBUG(D_INFO, "delete pool %p\n", pool);
837                 /* In the function below, .hs_keycmp resolves to
838                  * pool_hashkey_keycmp()
839                  */
840                 /* coverity[overrun-buffer-val] */
841                 lov_pool_del(obd, pool->pool_name);
842         }
843         cfs_hash_putref(lov->lov_pools_hash_body);
844         lov_ost_pool_free(&lov->lov_packed);
845
846         lprocfs_obd_cleanup(obd);
847         if (lov->lov_tgts) {
848                 int i;
849
850                 obd_getref(obd);
851                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
852                         if (!lov->lov_tgts[i])
853                                 continue;
854
855                         /* Inactive targets may never have connected */
856                         if (lov->lov_tgts[i]->ltd_active ||
857                             atomic_read(&lov->lov_refcount))
858                             /* We should never get here - these
859                              * should have been removed in the
860                              * disconnect.
861                              */
862                                 CERROR("lov tgt %d not cleaned! deathrow=%d, lovrc=%d\n",
863                                        i, lov->lov_death_row,
864                                        atomic_read(&lov->lov_refcount));
865                         lov_del_target(obd, i, NULL, 0);
866                 }
867                 obd_putref(obd);
868                 kfree(lov->lov_tgts);
869                 lov->lov_tgt_size = 0;
870         }
871
872         if (lov->lov_cache) {
873                 cl_cache_decref(lov->lov_cache);
874                 lov->lov_cache = NULL;
875         }
876
877         return 0;
878 }
879
880 int lov_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg,
881                             __u32 *indexp, int *genp)
882 {
883         struct obd_uuid obd_uuid;
884         int cmd;
885         int rc = 0;
886
887         switch (cmd = lcfg->lcfg_command) {
888         case LCFG_LOV_ADD_OBD:
889         case LCFG_LOV_ADD_INA:
890         case LCFG_LOV_DEL_OBD: {
891                 __u32 index;
892                 int gen;
893                 /* lov_modify_tgts add  0:lov_mdsA  1:ost1_UUID  2:0  3:1 */
894                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid)) {
895                         rc = -EINVAL;
896                         goto out;
897                 }
898
899                 obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
900
901                 rc = kstrtoint(lustre_cfg_buf(lcfg, 2), 10, indexp);
902                 if (rc < 0)
903                         goto out;
904                 rc = kstrtoint(lustre_cfg_buf(lcfg, 3), 10, genp);
905                 if (rc < 0)
906                         goto out;
907                 index = *indexp;
908                 gen = *genp;
909                 if (cmd == LCFG_LOV_ADD_OBD)
910                         rc = lov_add_target(obd, &obd_uuid, index, gen, 1);
911                 else if (cmd == LCFG_LOV_ADD_INA)
912                         rc = lov_add_target(obd, &obd_uuid, index, gen, 0);
913                 else
914                         rc = lov_del_target(obd, index, &obd_uuid, gen);
915                 goto out;
916         }
917         case LCFG_PARAM: {
918                 struct lprocfs_static_vars lvars = { NULL };
919                 struct lov_desc *desc = &obd->u.lov.desc;
920
921                 if (!desc) {
922                         rc = -EINVAL;
923                         goto out;
924                 }
925
926                 lprocfs_lov_init_vars(&lvars);
927
928                 rc = class_process_proc_param(PARAM_LOV, lvars.obd_vars,
929                                               lcfg, obd);
930                 if (rc > 0)
931                         rc = 0;
932                 goto out;
933         }
934         case LCFG_POOL_NEW:
935         case LCFG_POOL_ADD:
936         case LCFG_POOL_DEL:
937         case LCFG_POOL_REM:
938                 goto out;
939
940         default: {
941                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
942                 rc = -EINVAL;
943                 goto out;
944         }
945         }
946 out:
947         return rc;
948 }
949
950 static int
951 lov_statfs_interpret(struct ptlrpc_request_set *rqset, void *data, int rc)
952 {
953         struct lov_request_set *lovset = (struct lov_request_set *)data;
954         int err;
955
956         if (rc)
957                 atomic_set(&lovset->set_completes, 0);
958
959         err = lov_fini_statfs_set(lovset);
960         return rc ? rc : err;
961 }
962
963 static int lov_statfs_async(struct obd_export *exp, struct obd_info *oinfo,
964                             __u64 max_age, struct ptlrpc_request_set *rqset)
965 {
966         struct obd_device      *obd = class_exp2obd(exp);
967         struct lov_request_set *set;
968         struct lov_request *req;
969         struct lov_obd *lov;
970         int rc = 0;
971
972         LASSERT(oinfo->oi_osfs);
973
974         lov = &obd->u.lov;
975         rc = lov_prep_statfs_set(obd, oinfo, &set);
976         if (rc)
977                 return rc;
978
979         list_for_each_entry(req, &set->set_list, rq_link) {
980                 rc = obd_statfs_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
981                                       &req->rq_oi, max_age, rqset);
982                 if (rc)
983                         break;
984         }
985
986         if (rc || list_empty(&rqset->set_requests)) {
987                 int err;
988
989                 if (rc)
990                         atomic_set(&set->set_completes, 0);
991                 err = lov_fini_statfs_set(set);
992                 return rc ? rc : err;
993         }
994
995         LASSERT(!rqset->set_interpret);
996         rqset->set_interpret = lov_statfs_interpret;
997         rqset->set_arg = (void *)set;
998         return 0;
999 }
1000
1001 static int lov_statfs(const struct lu_env *env, struct obd_export *exp,
1002                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
1003 {
1004         struct ptlrpc_request_set *set = NULL;
1005         struct obd_info oinfo = {
1006                 .oi_osfs = osfs,
1007                 .oi_flags = flags,
1008         };
1009         int rc = 0;
1010
1011         /* for obdclass we forbid using obd_statfs_rqset, but prefer using async
1012          * statfs requests
1013          */
1014         set = ptlrpc_prep_set();
1015         if (!set)
1016                 return -ENOMEM;
1017
1018         rc = lov_statfs_async(exp, &oinfo, max_age, set);
1019         if (rc == 0)
1020                 rc = ptlrpc_set_wait(set);
1021         ptlrpc_set_destroy(set);
1022
1023         return rc;
1024 }
1025
1026 static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1027                          void *karg, void __user *uarg)
1028 {
1029         struct obd_device *obddev = class_exp2obd(exp);
1030         struct lov_obd *lov = &obddev->u.lov;
1031         int i = 0, rc = 0, count = lov->desc.ld_tgt_count;
1032         struct obd_uuid *uuidp;
1033
1034         switch (cmd) {
1035         case IOC_OBD_STATFS: {
1036                 struct obd_ioctl_data *data = karg;
1037                 struct obd_device *osc_obd;
1038                 struct obd_statfs stat_buf = {0};
1039                 __u32 index;
1040                 __u32 flags;
1041
1042                 memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
1043                 if (index >= count)
1044                         return -ENODEV;
1045
1046                 if (!lov->lov_tgts[index])
1047                         /* Try again with the next index */
1048                         return -EAGAIN;
1049                 if (!lov->lov_tgts[index]->ltd_active)
1050                         return -ENODATA;
1051
1052                 osc_obd = class_exp2obd(lov->lov_tgts[index]->ltd_exp);
1053                 if (!osc_obd)
1054                         return -EINVAL;
1055
1056                 /* copy UUID */
1057                 if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(osc_obd),
1058                                  min_t(unsigned long, data->ioc_plen2,
1059                                        sizeof(struct obd_uuid))))
1060                         return -EFAULT;
1061
1062                 memcpy(&flags, data->ioc_inlbuf1, sizeof(__u32));
1063                 flags = flags & LL_STATFS_NODELAY ? OBD_STATFS_NODELAY : 0;
1064
1065                 /* got statfs data */
1066                 rc = obd_statfs(NULL, lov->lov_tgts[index]->ltd_exp, &stat_buf,
1067                                 cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
1068                                 flags);
1069                 if (rc)
1070                         return rc;
1071                 if (copy_to_user(data->ioc_pbuf1, &stat_buf,
1072                                  min_t(unsigned long, data->ioc_plen1,
1073                                        sizeof(stat_buf))))
1074                         return -EFAULT;
1075                 break;
1076         }
1077         case OBD_IOC_LOV_GET_CONFIG: {
1078                 struct obd_ioctl_data *data;
1079                 struct lov_desc *desc;
1080                 char *buf = NULL;
1081                 __u32 *genp;
1082
1083                 len = 0;
1084                 if (obd_ioctl_getdata(&buf, &len, uarg))
1085                         return -EINVAL;
1086
1087                 data = (struct obd_ioctl_data *)buf;
1088
1089                 if (sizeof(*desc) > data->ioc_inllen1) {
1090                         kvfree(buf);
1091                         return -EINVAL;
1092                 }
1093
1094                 if (sizeof(uuidp->uuid) * count > data->ioc_inllen2) {
1095                         kvfree(buf);
1096                         return -EINVAL;
1097                 }
1098
1099                 if (sizeof(__u32) * count > data->ioc_inllen3) {
1100                         kvfree(buf);
1101                         return -EINVAL;
1102                 }
1103
1104                 desc = (struct lov_desc *)data->ioc_inlbuf1;
1105                 memcpy(desc, &lov->desc, sizeof(*desc));
1106
1107                 uuidp = (struct obd_uuid *)data->ioc_inlbuf2;
1108                 genp = (__u32 *)data->ioc_inlbuf3;
1109                 /* the uuid will be empty for deleted OSTs */
1110                 for (i = 0; i < count; i++, uuidp++, genp++) {
1111                         if (!lov->lov_tgts[i])
1112                                 continue;
1113                         *uuidp = lov->lov_tgts[i]->ltd_uuid;
1114                         *genp = lov->lov_tgts[i]->ltd_gen;
1115                 }
1116
1117                 if (copy_to_user(uarg, buf, len))
1118                         rc = -EFAULT;
1119                 kvfree(buf);
1120                 break;
1121         }
1122         case OBD_IOC_QUOTACTL: {
1123                 struct if_quotactl *qctl = karg;
1124                 struct lov_tgt_desc *tgt = NULL;
1125                 struct obd_quotactl *oqctl;
1126
1127                 if (qctl->qc_valid == QC_OSTIDX) {
1128                         if (count <= qctl->qc_idx)
1129                                 return -EINVAL;
1130
1131                         tgt = lov->lov_tgts[qctl->qc_idx];
1132                         if (!tgt || !tgt->ltd_exp)
1133                                 return -EINVAL;
1134                 } else if (qctl->qc_valid == QC_UUID) {
1135                         for (i = 0; i < count; i++) {
1136                                 tgt = lov->lov_tgts[i];
1137                                 if (!tgt ||
1138                                     !obd_uuid_equals(&tgt->ltd_uuid,
1139                                                      &qctl->obd_uuid))
1140                                         continue;
1141
1142                                 if (!tgt->ltd_exp)
1143                                         return -EINVAL;
1144
1145                                 break;
1146                         }
1147                 } else {
1148                         return -EINVAL;
1149                 }
1150
1151                 if (i >= count)
1152                         return -EAGAIN;
1153
1154                 LASSERT(tgt && tgt->ltd_exp);
1155                 oqctl = kzalloc(sizeof(*oqctl), GFP_NOFS);
1156                 if (!oqctl)
1157                         return -ENOMEM;
1158
1159                 QCTL_COPY(oqctl, qctl);
1160                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
1161                 if (rc == 0) {
1162                         QCTL_COPY(qctl, oqctl);
1163                         qctl->qc_valid = QC_OSTIDX;
1164                         qctl->obd_uuid = tgt->ltd_uuid;
1165                 }
1166                 kfree(oqctl);
1167                 break;
1168         }
1169         default: {
1170                 int set = 0;
1171
1172                 if (count == 0)
1173                         return -ENOTTY;
1174
1175                 for (i = 0; i < count; i++) {
1176                         int err;
1177                         struct obd_device *osc_obd;
1178
1179                         /* OST was disconnected */
1180                         if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
1181                                 continue;
1182
1183                         /* ll_umount_begin() sets force flag but for lov, not
1184                          * osc. Let's pass it through
1185                          */
1186                         osc_obd = class_exp2obd(lov->lov_tgts[i]->ltd_exp);
1187                         osc_obd->obd_force = obddev->obd_force;
1188                         err = obd_iocontrol(cmd, lov->lov_tgts[i]->ltd_exp,
1189                                             len, karg, uarg);
1190                         if (err) {
1191                                 if (lov->lov_tgts[i]->ltd_active) {
1192                                         CDEBUG(err == -ENOTTY ?
1193                                                D_IOCTL : D_WARNING,
1194                                                "iocontrol OSC %s on OST idx %d cmd %x: err = %d\n",
1195                                                lov_uuid2str(lov, i),
1196                                                i, cmd, err);
1197                                         if (!rc)
1198                                                 rc = err;
1199                                 }
1200                         } else {
1201                                 set = 1;
1202                         }
1203                 }
1204                 if (!set && !rc)
1205                         rc = -EIO;
1206         }
1207         }
1208
1209         return rc;
1210 }
1211
1212 static int lov_get_info(const struct lu_env *env, struct obd_export *exp,
1213                         __u32 keylen, void *key, __u32 *vallen, void *val)
1214 {
1215         struct obd_device *obddev = class_exp2obd(exp);
1216         struct lov_obd *lov = &obddev->u.lov;
1217         struct lov_desc *ld = &lov->desc;
1218         int rc = 0;
1219
1220         if (!vallen || !val)
1221                 return -EFAULT;
1222
1223         obd_getref(obddev);
1224
1225         if (KEY_IS(KEY_MAX_EASIZE)) {
1226                 u32 max_stripe_count = min_t(u32, ld->ld_active_tgt_count,
1227                                              LOV_MAX_STRIPE_COUNT);
1228
1229                 *((u32 *)val) = lov_mds_md_size(max_stripe_count, LOV_MAGIC_V3);
1230         } else if (KEY_IS(KEY_DEFAULT_EASIZE)) {
1231                 u32 def_stripe_count = min_t(u32, ld->ld_default_stripe_count,
1232                                              LOV_MAX_STRIPE_COUNT);
1233
1234                 *((u32 *)val) = lov_mds_md_size(def_stripe_count, LOV_MAGIC_V3);
1235         } else if (KEY_IS(KEY_TGT_COUNT)) {
1236                 *((int *)val) = lov->desc.ld_tgt_count;
1237         } else {
1238                 rc = -EINVAL;
1239         }
1240
1241         obd_putref(obddev);
1242         return rc;
1243 }
1244
1245 static int lov_set_info_async(const struct lu_env *env, struct obd_export *exp,
1246                               u32 keylen, void *key, u32 vallen,
1247                               void *val, struct ptlrpc_request_set *set)
1248 {
1249         struct obd_device *obddev = class_exp2obd(exp);
1250         struct lov_obd *lov = &obddev->u.lov;
1251         u32 count;
1252         int i, rc = 0, err;
1253         struct lov_tgt_desc *tgt;
1254         int do_inactive = 0, no_set = 0;
1255
1256         if (!set) {
1257                 no_set = 1;
1258                 set = ptlrpc_prep_set();
1259                 if (!set)
1260                         return -ENOMEM;
1261         }
1262
1263         obd_getref(obddev);
1264         count = lov->desc.ld_tgt_count;
1265
1266         if (KEY_IS(KEY_CHECKSUM)) {
1267                 do_inactive = 1;
1268         } else if (KEY_IS(KEY_CACHE_SET)) {
1269                 LASSERT(!lov->lov_cache);
1270                 lov->lov_cache = val;
1271                 do_inactive = 1;
1272                 cl_cache_incref(lov->lov_cache);
1273         }
1274
1275         for (i = 0; i < count; i++) {
1276                 tgt = lov->lov_tgts[i];
1277
1278                 /* OST was disconnected */
1279                 if (!tgt || !tgt->ltd_exp)
1280                         continue;
1281
1282                 /* OST is inactive and we don't want inactive OSCs */
1283                 if (!tgt->ltd_active && !do_inactive)
1284                         continue;
1285
1286                 err = obd_set_info_async(env, tgt->ltd_exp, keylen, key,
1287                                          vallen, val, set);
1288                 if (!rc)
1289                         rc = err;
1290         }
1291
1292         obd_putref(obddev);
1293         if (no_set) {
1294                 err = ptlrpc_set_wait(set);
1295                 if (!rc)
1296                         rc = err;
1297                 ptlrpc_set_destroy(set);
1298         }
1299         return rc;
1300 }
1301
1302 void lov_stripe_lock(struct lov_stripe_md *md)
1303                 __acquires(&md->lsm_lock)
1304 {
1305         LASSERT(md->lsm_lock_owner != current_pid());
1306         spin_lock(&md->lsm_lock);
1307         LASSERT(md->lsm_lock_owner == 0);
1308         md->lsm_lock_owner = current_pid();
1309 }
1310
1311 void lov_stripe_unlock(struct lov_stripe_md *md)
1312                 __releases(&md->lsm_lock)
1313 {
1314         LASSERT(md->lsm_lock_owner == current_pid());
1315         md->lsm_lock_owner = 0;
1316         spin_unlock(&md->lsm_lock);
1317 }
1318
1319 static int lov_quotactl(struct obd_device *obd, struct obd_export *exp,
1320                         struct obd_quotactl *oqctl)
1321 {
1322         struct lov_obd      *lov = &obd->u.lov;
1323         struct lov_tgt_desc *tgt;
1324         __u64           curspace = 0;
1325         __u64           bhardlimit = 0;
1326         int               i, rc = 0;
1327
1328         if (oqctl->qc_cmd != Q_GETOQUOTA &&
1329             oqctl->qc_cmd != LUSTRE_Q_SETQUOTA) {
1330                 CERROR("bad quota opc %x for lov obd\n", oqctl->qc_cmd);
1331                 return -EFAULT;
1332         }
1333
1334         /* for lov tgt */
1335         obd_getref(obd);
1336         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1337                 int err;
1338
1339                 tgt = lov->lov_tgts[i];
1340
1341                 if (!tgt)
1342                         continue;
1343
1344                 if (!tgt->ltd_active || tgt->ltd_reap) {
1345                         if (oqctl->qc_cmd == Q_GETOQUOTA &&
1346                             lov->lov_tgts[i]->ltd_activate) {
1347                                 rc = -EREMOTEIO;
1348                                 CERROR("ost %d is inactive\n", i);
1349                         } else {
1350                                 CDEBUG(D_HA, "ost %d is inactive\n", i);
1351                         }
1352                         continue;
1353                 }
1354
1355                 err = obd_quotactl(tgt->ltd_exp, oqctl);
1356                 if (err) {
1357                         if (tgt->ltd_active && !rc)
1358                                 rc = err;
1359                         continue;
1360                 }
1361
1362                 if (oqctl->qc_cmd == Q_GETOQUOTA) {
1363                         curspace += oqctl->qc_dqblk.dqb_curspace;
1364                         bhardlimit += oqctl->qc_dqblk.dqb_bhardlimit;
1365                 }
1366         }
1367         obd_putref(obd);
1368
1369         if (oqctl->qc_cmd == Q_GETOQUOTA) {
1370                 oqctl->qc_dqblk.dqb_curspace = curspace;
1371                 oqctl->qc_dqblk.dqb_bhardlimit = bhardlimit;
1372         }
1373         return rc;
1374 }
1375
1376 static struct obd_ops lov_obd_ops = {
1377         .owner          = THIS_MODULE,
1378         .setup          = lov_setup,
1379         .cleanup        = lov_cleanup,
1380         /*.process_config       = lov_process_config,*/
1381         .connect        = lov_connect,
1382         .disconnect     = lov_disconnect,
1383         .statfs         = lov_statfs,
1384         .statfs_async   = lov_statfs_async,
1385         .iocontrol      = lov_iocontrol,
1386         .get_info       = lov_get_info,
1387         .set_info_async = lov_set_info_async,
1388         .notify         = lov_notify,
1389         .pool_new       = lov_pool_new,
1390         .pool_rem       = lov_pool_remove,
1391         .pool_add       = lov_pool_add,
1392         .pool_del       = lov_pool_del,
1393         .getref         = lov_getref,
1394         .putref         = lov_putref,
1395         .quotactl       = lov_quotactl,
1396 };
1397
1398 struct kmem_cache *lov_oinfo_slab;
1399
1400 static int __init lov_init(void)
1401 {
1402         struct lprocfs_static_vars lvars = { NULL };
1403         int rc;
1404
1405         /* print an address of _any_ initialized kernel symbol from this
1406          * module, to allow debugging with gdb that doesn't support data
1407          * symbols from modules.
1408          */
1409         CDEBUG(D_INFO, "Lustre LOV module (%p).\n", &lov_caches);
1410
1411         rc = lu_kmem_init(lov_caches);
1412         if (rc)
1413                 return rc;
1414
1415         lov_oinfo_slab = kmem_cache_create("lov_oinfo",
1416                                            sizeof(struct lov_oinfo),
1417                                            0, SLAB_HWCACHE_ALIGN, NULL);
1418         if (!lov_oinfo_slab) {
1419                 lu_kmem_fini(lov_caches);
1420                 return -ENOMEM;
1421         }
1422         lprocfs_lov_init_vars(&lvars);
1423
1424         rc = class_register_type(&lov_obd_ops, NULL,
1425                                  LUSTRE_LOV_NAME, &lov_device_type);
1426
1427         if (rc) {
1428                 kmem_cache_destroy(lov_oinfo_slab);
1429                 lu_kmem_fini(lov_caches);
1430         }
1431
1432         return rc;
1433 }
1434
1435 static void /*__exit*/ lov_exit(void)
1436 {
1437         class_unregister_type(LUSTRE_LOV_NAME);
1438         kmem_cache_destroy(lov_oinfo_slab);
1439
1440         lu_kmem_fini(lov_caches);
1441 }
1442
1443 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
1444 MODULE_DESCRIPTION("Lustre Logical Object Volume");
1445 MODULE_LICENSE("GPL");
1446 MODULE_VERSION(LUSTRE_VERSION_STRING);
1447
1448 module_init(lov_init);
1449 module_exit(lov_exit);