GNU Linux-libre 4.9.304-gnu1
[releases.git] / drivers / staging / lustre / lustre / mgc / mgc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mgc/mgc_request.c
33  *
34  * Author: Nathan Rutman <nathan@clusterfs.com>
35  */
36
37 #define DEBUG_SUBSYSTEM S_MGC
38 #define D_MGC D_CONFIG /*|D_WARNING*/
39
40 #include <linux/module.h>
41 #include "../include/obd_class.h"
42 #include "../include/lustre_dlm.h"
43 #include "../include/lprocfs_status.h"
44 #include "../include/lustre_log.h"
45 #include "../include/lustre_disk.h"
46
47 #include "mgc_internal.h"
48
49 static int mgc_name2resid(char *name, int len, struct ldlm_res_id *res_id,
50                           int type)
51 {
52         __u64 resname = 0;
53
54         if (len > sizeof(resname)) {
55                 CERROR("name too long: %s\n", name);
56                 return -EINVAL;
57         }
58         if (len <= 0) {
59                 CERROR("missing name: %s\n", name);
60                 return -EINVAL;
61         }
62         memcpy(&resname, name, len);
63
64         /* Always use the same endianness for the resid */
65         memset(res_id, 0, sizeof(*res_id));
66         res_id->name[0] = cpu_to_le64(resname);
67         /* XXX: unfortunately, sptlprc and config llog share one lock */
68         switch (type) {
69         case CONFIG_T_CONFIG:
70         case CONFIG_T_SPTLRPC:
71                 resname = 0;
72                 break;
73         case CONFIG_T_RECOVER:
74         case CONFIG_T_PARAMS:
75                 resname = type;
76                 break;
77         default:
78                 LBUG();
79         }
80         res_id->name[1] = cpu_to_le64(resname);
81         CDEBUG(D_MGC, "log %s to resid %#llx/%#llx (%.8s)\n", name,
82                res_id->name[0], res_id->name[1], (char *)&res_id->name[0]);
83         return 0;
84 }
85
86 int mgc_fsname2resid(char *fsname, struct ldlm_res_id *res_id, int type)
87 {
88         /* fsname is at most 8 chars long, maybe contain "-".
89          * e.g. "lustre", "SUN-000"
90          */
91         return mgc_name2resid(fsname, strlen(fsname), res_id, type);
92 }
93 EXPORT_SYMBOL(mgc_fsname2resid);
94
95 static int mgc_logname2resid(char *logname, struct ldlm_res_id *res_id, int type)
96 {
97         char *name_end;
98         int len;
99
100         /* logname consists of "fsname-nodetype".
101          * e.g. "lustre-MDT0001", "SUN-000-client"
102          * there is an exception: llog "params"
103          */
104         name_end = strrchr(logname, '-');
105         if (!name_end)
106                 len = strlen(logname);
107         else
108                 len = name_end - logname;
109         return mgc_name2resid(logname, len, res_id, type);
110 }
111
112 /********************** config llog list **********************/
113 static LIST_HEAD(config_llog_list);
114 static DEFINE_SPINLOCK(config_list_lock);
115
116 /* Take a reference to a config log */
117 static int config_log_get(struct config_llog_data *cld)
118 {
119         atomic_inc(&cld->cld_refcount);
120         CDEBUG(D_INFO, "log %s refs %d\n", cld->cld_logname,
121                atomic_read(&cld->cld_refcount));
122         return 0;
123 }
124
125 /* Drop a reference to a config log.  When no longer referenced,
126  * we can free the config log data
127  */
128 static void config_log_put(struct config_llog_data *cld)
129 {
130         CDEBUG(D_INFO, "log %s refs %d\n", cld->cld_logname,
131                atomic_read(&cld->cld_refcount));
132         LASSERT(atomic_read(&cld->cld_refcount) > 0);
133
134         /* spinlock to make sure no item with 0 refcount in the list */
135         if (atomic_dec_and_lock(&cld->cld_refcount, &config_list_lock)) {
136                 list_del(&cld->cld_list_chain);
137                 spin_unlock(&config_list_lock);
138
139                 CDEBUG(D_MGC, "dropping config log %s\n", cld->cld_logname);
140
141                 if (cld->cld_recover)
142                         config_log_put(cld->cld_recover);
143                 if (cld->cld_sptlrpc)
144                         config_log_put(cld->cld_sptlrpc);
145                 if (cld->cld_params)
146                         config_log_put(cld->cld_params);
147                 if (cld_is_sptlrpc(cld))
148                         sptlrpc_conf_log_stop(cld->cld_logname);
149
150                 class_export_put(cld->cld_mgcexp);
151                 kfree(cld);
152         }
153 }
154
155 /* Find a config log by name */
156 static
157 struct config_llog_data *config_log_find(char *logname,
158                                          struct config_llog_instance *cfg)
159 {
160         struct config_llog_data *cld;
161         struct config_llog_data *found = NULL;
162         void *instance;
163
164         LASSERT(logname);
165
166         instance = cfg ? cfg->cfg_instance : NULL;
167         spin_lock(&config_list_lock);
168         list_for_each_entry(cld, &config_llog_list, cld_list_chain) {
169                 /* check if instance equals */
170                 if (instance != cld->cld_cfg.cfg_instance)
171                         continue;
172
173                 /* instance may be NULL, should check name */
174                 if (strcmp(logname, cld->cld_logname) == 0) {
175                         found = cld;
176                         break;
177                 }
178         }
179         if (found) {
180                 atomic_inc(&found->cld_refcount);
181                 LASSERT(found->cld_stopping == 0 || cld_is_sptlrpc(found) == 0);
182         }
183         spin_unlock(&config_list_lock);
184         return found;
185 }
186
187 static
188 struct config_llog_data *do_config_log_add(struct obd_device *obd,
189                                            char *logname,
190                                            int type,
191                                            struct config_llog_instance *cfg,
192                                            struct super_block *sb)
193 {
194         struct config_llog_data *cld;
195         int                   rc;
196
197         CDEBUG(D_MGC, "do adding config log %s:%p\n", logname,
198                cfg ? cfg->cfg_instance : NULL);
199
200         cld = kzalloc(sizeof(*cld) + strlen(logname) + 1, GFP_NOFS);
201         if (!cld)
202                 return ERR_PTR(-ENOMEM);
203
204         strcpy(cld->cld_logname, logname);
205         if (cfg)
206                 cld->cld_cfg = *cfg;
207         else
208                 cld->cld_cfg.cfg_callback = class_config_llog_handler;
209         mutex_init(&cld->cld_lock);
210         cld->cld_cfg.cfg_last_idx = 0;
211         cld->cld_cfg.cfg_flags = 0;
212         cld->cld_cfg.cfg_sb = sb;
213         cld->cld_type = type;
214         atomic_set(&cld->cld_refcount, 1);
215
216         /* Keep the mgc around until we are done */
217         cld->cld_mgcexp = class_export_get(obd->obd_self_export);
218
219         if (cld_is_sptlrpc(cld)) {
220                 sptlrpc_conf_log_start(logname);
221                 cld->cld_cfg.cfg_obdname = obd->obd_name;
222         }
223
224         rc = mgc_logname2resid(logname, &cld->cld_resid, type);
225
226         spin_lock(&config_list_lock);
227         list_add(&cld->cld_list_chain, &config_llog_list);
228         spin_unlock(&config_list_lock);
229
230         if (rc) {
231                 config_log_put(cld);
232                 return ERR_PTR(rc);
233         }
234
235         if (cld_is_sptlrpc(cld)) {
236                 rc = mgc_process_log(obd, cld);
237                 if (rc && rc != -ENOENT)
238                         CERROR("failed processing sptlrpc log: %d\n", rc);
239         }
240
241         return cld;
242 }
243
244 static struct config_llog_data *
245 config_recover_log_add(struct obd_device *obd, char *fsname,
246                        struct config_llog_instance *cfg,
247                        struct super_block *sb)
248 {
249         struct config_llog_instance lcfg = *cfg;
250         struct config_llog_data *cld;
251         char logname[32];
252
253         /* we have to use different llog for clients and mdts for cmd
254          * where only clients are notified if one of cmd server restarts
255          */
256         LASSERT(strlen(fsname) < sizeof(logname) / 2);
257         strcpy(logname, fsname);
258         LASSERT(lcfg.cfg_instance);
259         strcat(logname, "-cliir");
260
261         cld = do_config_log_add(obd, logname, CONFIG_T_RECOVER, &lcfg, sb);
262         return cld;
263 }
264
265 static struct config_llog_data *
266 config_params_log_add(struct obd_device *obd,
267                       struct config_llog_instance *cfg, struct super_block *sb)
268 {
269         struct config_llog_instance     lcfg = *cfg;
270         struct config_llog_data         *cld;
271
272         lcfg.cfg_instance = sb;
273
274         cld = do_config_log_add(obd, PARAMS_FILENAME, CONFIG_T_PARAMS,
275                                 &lcfg, sb);
276
277         return cld;
278 }
279
280 /** Add this log to the list of active logs watched by an MGC.
281  * Active means we're watching for updates.
282  * We have one active log per "mount" - client instance or servername.
283  * Each instance may be at a different point in the log.
284  */
285 static int config_log_add(struct obd_device *obd, char *logname,
286                           struct config_llog_instance *cfg,
287                           struct super_block *sb)
288 {
289         struct lustre_sb_info *lsi = s2lsi(sb);
290         struct config_llog_data *cld;
291         struct config_llog_data *sptlrpc_cld;
292         struct config_llog_data *params_cld;
293         char                    seclogname[32];
294         char                    *ptr;
295         int                     rc;
296
297         CDEBUG(D_MGC, "adding config log %s:%p\n", logname, cfg->cfg_instance);
298
299         /*
300          * for each regular log, the depended sptlrpc log name is
301          * <fsname>-sptlrpc. multiple regular logs may share one sptlrpc log.
302          */
303         ptr = strrchr(logname, '-');
304         if (!ptr || ptr - logname > 8) {
305                 CERROR("logname %s is too long\n", logname);
306                 return -EINVAL;
307         }
308
309         memcpy(seclogname, logname, ptr - logname);
310         strcpy(seclogname + (ptr - logname), "-sptlrpc");
311
312         sptlrpc_cld = config_log_find(seclogname, NULL);
313         if (!sptlrpc_cld) {
314                 sptlrpc_cld = do_config_log_add(obd, seclogname,
315                                                 CONFIG_T_SPTLRPC, NULL, NULL);
316                 if (IS_ERR(sptlrpc_cld)) {
317                         CERROR("can't create sptlrpc log: %s\n", seclogname);
318                         rc = PTR_ERR(sptlrpc_cld);
319                         goto out_err;
320                 }
321         }
322         params_cld = config_params_log_add(obd, cfg, sb);
323         if (IS_ERR(params_cld)) {
324                 rc = PTR_ERR(params_cld);
325                 CERROR("%s: can't create params log: rc = %d\n",
326                        obd->obd_name, rc);
327                 goto out_err1;
328         }
329
330         cld = do_config_log_add(obd, logname, CONFIG_T_CONFIG, cfg, sb);
331         if (IS_ERR(cld)) {
332                 CERROR("can't create log: %s\n", logname);
333                 rc = PTR_ERR(cld);
334                 goto out_err2;
335         }
336
337         cld->cld_sptlrpc = sptlrpc_cld;
338         cld->cld_params = params_cld;
339
340         LASSERT(lsi->lsi_lmd);
341         if (!(lsi->lsi_lmd->lmd_flags & LMD_FLG_NOIR)) {
342                 struct config_llog_data *recover_cld;
343
344                 ptr = strrchr(seclogname, '-');
345                 if (ptr) {
346                         *ptr = 0;
347                 } else {
348                         CERROR("%s: sptlrpc log name not correct, %s: rc = %d\n",
349                                obd->obd_name, seclogname, -EINVAL);
350                         config_log_put(cld);
351                         return -EINVAL;
352                 }
353                 recover_cld = config_recover_log_add(obd, seclogname, cfg, sb);
354                 if (IS_ERR(recover_cld)) {
355                         rc = PTR_ERR(recover_cld);
356                         goto out_err3;
357                 }
358                 cld->cld_recover = recover_cld;
359         }
360
361         return 0;
362
363 out_err3:
364         config_log_put(cld);
365
366 out_err2:
367         config_log_put(params_cld);
368
369 out_err1:
370         config_log_put(sptlrpc_cld);
371
372 out_err:
373         return rc;
374 }
375
376 DEFINE_MUTEX(llog_process_lock);
377
378 /** Stop watching for updates on this log.
379  */
380 static int config_log_end(char *logname, struct config_llog_instance *cfg)
381 {
382         struct config_llog_data *cld;
383         struct config_llog_data *cld_sptlrpc = NULL;
384         struct config_llog_data *cld_params = NULL;
385         struct config_llog_data *cld_recover = NULL;
386         int rc = 0;
387
388         cld = config_log_find(logname, cfg);
389         if (!cld)
390                 return -ENOENT;
391
392         mutex_lock(&cld->cld_lock);
393         /*
394          * if cld_stopping is set, it means we didn't start the log thus
395          * not owning the start ref. this can happen after previous umount:
396          * the cld still hanging there waiting for lock cancel, and we
397          * remount again but failed in the middle and call log_end without
398          * calling start_log.
399          */
400         if (unlikely(cld->cld_stopping)) {
401                 mutex_unlock(&cld->cld_lock);
402                 /* drop the ref from the find */
403                 config_log_put(cld);
404                 return rc;
405         }
406
407         cld->cld_stopping = 1;
408
409         cld_recover = cld->cld_recover;
410         cld->cld_recover = NULL;
411         mutex_unlock(&cld->cld_lock);
412
413         if (cld_recover) {
414                 mutex_lock(&cld_recover->cld_lock);
415                 cld_recover->cld_stopping = 1;
416                 mutex_unlock(&cld_recover->cld_lock);
417                 config_log_put(cld_recover);
418         }
419
420         spin_lock(&config_list_lock);
421         cld_sptlrpc = cld->cld_sptlrpc;
422         cld->cld_sptlrpc = NULL;
423         cld_params = cld->cld_params;
424         cld->cld_params = NULL;
425         spin_unlock(&config_list_lock);
426
427         if (cld_sptlrpc)
428                 config_log_put(cld_sptlrpc);
429
430         if (cld_params) {
431                 mutex_lock(&cld_params->cld_lock);
432                 cld_params->cld_stopping = 1;
433                 mutex_unlock(&cld_params->cld_lock);
434                 config_log_put(cld_params);
435         }
436
437         /* drop the ref from the find */
438         config_log_put(cld);
439         /* drop the start ref */
440         config_log_put(cld);
441
442         CDEBUG(D_MGC, "end config log %s (%d)\n", logname ? logname : "client",
443                rc);
444         return rc;
445 }
446
447 int lprocfs_mgc_rd_ir_state(struct seq_file *m, void *data)
448 {
449         struct obd_device       *obd = data;
450         struct obd_import       *imp;
451         struct obd_connect_data *ocd;
452         struct config_llog_data *cld;
453         int rc;
454
455         rc = lprocfs_climp_check(obd);
456         if (rc)
457                 return rc;
458
459         imp = obd->u.cli.cl_import;
460         ocd = &imp->imp_connect_data;
461
462         seq_printf(m, "imperative_recovery: %s\n",
463                    OCD_HAS_FLAG(ocd, IMP_RECOV) ? "ENABLED" : "DISABLED");
464         seq_printf(m, "client_state:\n");
465
466         spin_lock(&config_list_lock);
467         list_for_each_entry(cld, &config_llog_list, cld_list_chain) {
468                 if (!cld->cld_recover)
469                         continue;
470                 seq_printf(m, "    - { client: %s, nidtbl_version: %u }\n",
471                            cld->cld_logname,
472                            cld->cld_recover->cld_cfg.cfg_last_idx);
473         }
474         spin_unlock(&config_list_lock);
475
476         up_read(&obd->u.cli.cl_sem);
477         return 0;
478 }
479
480 /* reenqueue any lost locks */
481 #define RQ_RUNNING 0x1
482 #define RQ_NOW     0x2
483 #define RQ_LATER   0x4
484 #define RQ_STOP    0x8
485 #define RQ_PRECLEANUP  0x10
486 static int rq_state;
487 static wait_queue_head_t            rq_waitq;
488 static DECLARE_COMPLETION(rq_exit);
489 static DECLARE_COMPLETION(rq_start);
490
491 static void do_requeue(struct config_llog_data *cld)
492 {
493         LASSERT(atomic_read(&cld->cld_refcount) > 0);
494
495         /* Do not run mgc_process_log on a disconnected export or an
496          * export which is being disconnected. Take the client
497          * semaphore to make the check non-racy.
498          */
499         down_read_nested(&cld->cld_mgcexp->exp_obd->u.cli.cl_sem,
500                          OBD_CLI_SEM_MGC);
501
502         if (cld->cld_mgcexp->exp_obd->u.cli.cl_conn_count != 0) {
503                 int rc;
504
505                 CDEBUG(D_MGC, "updating log %s\n", cld->cld_logname);
506                 rc = mgc_process_log(cld->cld_mgcexp->exp_obd, cld);
507                 if (rc && rc != -ENOENT)
508                         CERROR("failed processing log: %d\n", rc);
509         } else {
510                 CDEBUG(D_MGC, "disconnecting, won't update log %s\n",
511                        cld->cld_logname);
512         }
513         up_read(&cld->cld_mgcexp->exp_obd->u.cli.cl_sem);
514 }
515
516 /* this timeout represents how many seconds MGC should wait before
517  * requeue config and recover lock to the MGS. We need to randomize this
518  * in order to not flood the MGS.
519  */
520 #define MGC_TIMEOUT_MIN_SECONDS   5
521 #define MGC_TIMEOUT_RAND_CENTISEC 0x1ff /* ~500 */
522
523 static int mgc_requeue_thread(void *data)
524 {
525         bool first = true;
526
527         CDEBUG(D_MGC, "Starting requeue thread\n");
528
529         /* Keep trying failed locks periodically */
530         spin_lock(&config_list_lock);
531         rq_state |= RQ_RUNNING;
532         while (1) {
533                 struct l_wait_info lwi;
534                 struct config_llog_data *cld, *cld_prev;
535                 int rand = cfs_rand() & MGC_TIMEOUT_RAND_CENTISEC;
536                 int stopped = !!(rq_state & RQ_STOP);
537                 int to;
538
539                 /* Any new or requeued lostlocks will change the state */
540                 rq_state &= ~(RQ_NOW | RQ_LATER);
541                 spin_unlock(&config_list_lock);
542
543                 if (first) {
544                         first = false;
545                         complete(&rq_start);
546                 }
547
548                 /* Always wait a few seconds to allow the server who
549                  * caused the lock revocation to finish its setup, plus some
550                  * random so everyone doesn't try to reconnect at once.
551                  */
552                 to = msecs_to_jiffies(MGC_TIMEOUT_MIN_SECONDS * MSEC_PER_SEC);
553                 /* rand is centi-seconds */
554                 to += msecs_to_jiffies(rand * MSEC_PER_SEC / 100);
555                 lwi = LWI_TIMEOUT(to, NULL, NULL);
556                 l_wait_event(rq_waitq, rq_state & (RQ_STOP | RQ_PRECLEANUP),
557                              &lwi);
558
559                 /*
560                  * iterate & processing through the list. for each cld, process
561                  * its depending sptlrpc cld firstly (if any) and then itself.
562                  *
563                  * it's guaranteed any item in the list must have
564                  * reference > 0; and if cld_lostlock is set, at
565                  * least one reference is taken by the previous enqueue.
566                  */
567                 cld_prev = NULL;
568
569                 spin_lock(&config_list_lock);
570                 rq_state &= ~RQ_PRECLEANUP;
571                 list_for_each_entry(cld, &config_llog_list, cld_list_chain) {
572                         if (!cld->cld_lostlock)
573                                 continue;
574
575                         spin_unlock(&config_list_lock);
576
577                         LASSERT(atomic_read(&cld->cld_refcount) > 0);
578
579                         /* Whether we enqueued again or not in mgc_process_log,
580                          * we're done with the ref from the old enqueue
581                          */
582                         if (cld_prev)
583                                 config_log_put(cld_prev);
584                         cld_prev = cld;
585
586                         cld->cld_lostlock = 0;
587                         if (likely(!stopped))
588                                 do_requeue(cld);
589
590                         spin_lock(&config_list_lock);
591                 }
592                 spin_unlock(&config_list_lock);
593                 if (cld_prev)
594                         config_log_put(cld_prev);
595
596                 /* break after scanning the list so that we can drop
597                  * refcount to losing lock clds
598                  */
599                 if (unlikely(stopped)) {
600                         spin_lock(&config_list_lock);
601                         break;
602                 }
603
604                 /* Wait a bit to see if anyone else needs a requeue */
605                 lwi = (struct l_wait_info) { 0 };
606                 l_wait_event(rq_waitq, rq_state & (RQ_NOW | RQ_STOP),
607                              &lwi);
608                 spin_lock(&config_list_lock);
609         }
610         /* spinlock and while guarantee RQ_NOW and RQ_LATER are not set */
611         rq_state &= ~RQ_RUNNING;
612         spin_unlock(&config_list_lock);
613
614         complete(&rq_exit);
615
616         CDEBUG(D_MGC, "Ending requeue thread\n");
617         return 0;
618 }
619
620 /* Add a cld to the list to requeue.  Start the requeue thread if needed.
621  * We are responsible for dropping the config log reference from here on out.
622  */
623 static void mgc_requeue_add(struct config_llog_data *cld)
624 {
625         CDEBUG(D_INFO, "log %s: requeue (r=%d sp=%d st=%x)\n",
626                cld->cld_logname, atomic_read(&cld->cld_refcount),
627                cld->cld_stopping, rq_state);
628         LASSERT(atomic_read(&cld->cld_refcount) > 0);
629
630         mutex_lock(&cld->cld_lock);
631         if (cld->cld_stopping || cld->cld_lostlock) {
632                 mutex_unlock(&cld->cld_lock);
633                 return;
634         }
635         /* this refcount will be released in mgc_requeue_thread. */
636         config_log_get(cld);
637         cld->cld_lostlock = 1;
638         mutex_unlock(&cld->cld_lock);
639
640         /* Hold lock for rq_state */
641         spin_lock(&config_list_lock);
642         if (rq_state & RQ_STOP) {
643                 spin_unlock(&config_list_lock);
644                 cld->cld_lostlock = 0;
645                 config_log_put(cld);
646         } else {
647                 rq_state |= RQ_NOW;
648                 spin_unlock(&config_list_lock);
649                 wake_up(&rq_waitq);
650         }
651 }
652
653 static int mgc_llog_init(const struct lu_env *env, struct obd_device *obd)
654 {
655         struct llog_ctxt        *ctxt;
656         int                      rc;
657
658         /* setup only remote ctxt, the local disk context is switched per each
659          * filesystem during mgc_fs_setup()
660          */
661         rc = llog_setup(env, obd, &obd->obd_olg, LLOG_CONFIG_REPL_CTXT, obd,
662                         &llog_client_ops);
663         if (rc)
664                 return rc;
665
666         ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
667         LASSERT(ctxt);
668
669         llog_initiator_connect(ctxt);
670         llog_ctxt_put(ctxt);
671
672         return 0;
673 }
674
675 static int mgc_llog_fini(const struct lu_env *env, struct obd_device *obd)
676 {
677         struct llog_ctxt *ctxt;
678
679         ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
680         if (ctxt)
681                 llog_cleanup(env, ctxt);
682
683         return 0;
684 }
685
686 static atomic_t mgc_count = ATOMIC_INIT(0);
687 static int mgc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
688 {
689         int rc = 0;
690         int temp;
691
692         switch (stage) {
693         case OBD_CLEANUP_EARLY:
694                 break;
695         case OBD_CLEANUP_EXPORTS:
696                 if (atomic_dec_and_test(&mgc_count)) {
697                         LASSERT(rq_state & RQ_RUNNING);
698                         /* stop requeue thread */
699                         temp = RQ_STOP;
700                 } else {
701                         /* wakeup requeue thread to clean our cld */
702                         temp = RQ_NOW | RQ_PRECLEANUP;
703                 }
704                 spin_lock(&config_list_lock);
705                 rq_state |= temp;
706                 spin_unlock(&config_list_lock);
707                 wake_up(&rq_waitq);
708                 if (temp & RQ_STOP)
709                         wait_for_completion(&rq_exit);
710                 obd_cleanup_client_import(obd);
711                 rc = mgc_llog_fini(NULL, obd);
712                 if (rc != 0)
713                         CERROR("failed to cleanup llogging subsystems\n");
714                 break;
715         }
716         return rc;
717 }
718
719 static int mgc_cleanup(struct obd_device *obd)
720 {
721         /* COMPAT_146 - old config logs may have added profiles we don't
722          * know about
723          */
724         if (obd->obd_type->typ_refcnt <= 1)
725                 /* Only for the last mgc */
726                 class_del_profiles();
727
728         lprocfs_obd_cleanup(obd);
729         ptlrpcd_decref();
730
731         return client_obd_cleanup(obd);
732 }
733
734 static int mgc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
735 {
736         struct lprocfs_static_vars lvars = { NULL };
737         struct task_struct *task;
738         int rc;
739
740         rc = ptlrpcd_addref();
741         if (rc < 0)
742                 goto err_noref;
743
744         rc = client_obd_setup(obd, lcfg);
745         if (rc)
746                 goto err_decref;
747
748         rc = mgc_llog_init(NULL, obd);
749         if (rc) {
750                 CERROR("failed to setup llogging subsystems\n");
751                 goto err_cleanup;
752         }
753
754         lprocfs_mgc_init_vars(&lvars);
755         lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars);
756         sptlrpc_lprocfs_cliobd_attach(obd);
757
758         if (atomic_inc_return(&mgc_count) == 1) {
759                 rq_state = 0;
760                 init_waitqueue_head(&rq_waitq);
761
762                 /* start requeue thread */
763                 task = kthread_run(mgc_requeue_thread, NULL, "ll_cfg_requeue");
764                 if (IS_ERR(task)) {
765                         rc = PTR_ERR(task);
766                         CERROR("%s: cannot start requeue thread: rc = %d; no more log updates\n",
767                                obd->obd_name, rc);
768                         goto err_cleanup;
769                 }
770                 /* rc is the task_struct pointer of mgc_requeue_thread. */
771                 rc = 0;
772                 wait_for_completion(&rq_start);
773         }
774
775         return rc;
776
777 err_cleanup:
778         client_obd_cleanup(obd);
779 err_decref:
780         ptlrpcd_decref();
781 err_noref:
782         return rc;
783 }
784
785 /* based on ll_mdc_blocking_ast */
786 static int mgc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
787                             void *data, int flag)
788 {
789         struct lustre_handle lockh;
790         struct config_llog_data *cld = data;
791         int rc = 0;
792
793         switch (flag) {
794         case LDLM_CB_BLOCKING:
795                 /* mgs wants the lock, give it up... */
796                 LDLM_DEBUG(lock, "MGC blocking CB");
797                 ldlm_lock2handle(lock, &lockh);
798                 rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
799                 break;
800         case LDLM_CB_CANCELING:
801                 /* We've given up the lock, prepare ourselves to update. */
802                 LDLM_DEBUG(lock, "MGC cancel CB");
803
804                 CDEBUG(D_MGC, "Lock res "DLDLMRES" (%.8s)\n",
805                        PLDLMRES(lock->l_resource),
806                        (char *)&lock->l_resource->lr_name.name[0]);
807
808                 if (!cld) {
809                         CDEBUG(D_INFO, "missing data, won't requeue\n");
810                         break;
811                 }
812
813                 /* held at mgc_process_log(). */
814                 LASSERT(atomic_read(&cld->cld_refcount) > 0);
815                 /* Are we done with this log? */
816                 if (cld->cld_stopping) {
817                         CDEBUG(D_MGC, "log %s: stopping, won't requeue\n",
818                                cld->cld_logname);
819                         config_log_put(cld);
820                         break;
821                 }
822                 /* Make sure not to re-enqueue when the mgc is stopping
823                  * (we get called from client_disconnect_export)
824                  */
825                 if (!lock->l_conn_export ||
826                     !lock->l_conn_export->exp_obd->u.cli.cl_conn_count) {
827                         CDEBUG(D_MGC, "log %.8s: disconnecting, won't requeue\n",
828                                cld->cld_logname);
829                         config_log_put(cld);
830                         break;
831                 }
832
833                 /* Re-enqueue now */
834                 mgc_requeue_add(cld);
835                 config_log_put(cld);
836                 break;
837         default:
838                 LBUG();
839         }
840
841         return rc;
842 }
843
844 /* Not sure where this should go... */
845 /* This is the timeout value for MGS_CONNECT request plus a ping interval, such
846  * that we can have a chance to try the secondary MGS if any.
847  */
848 #define  MGC_ENQUEUE_LIMIT (INITIAL_CONNECT_TIMEOUT + (AT_OFF ? 0 : at_min) \
849                                 + PING_INTERVAL)
850 #define  MGC_TARGET_REG_LIMIT 10
851 #define  MGC_SEND_PARAM_LIMIT 10
852
853 /* Send parameter to MGS*/
854 static int mgc_set_mgs_param(struct obd_export *exp,
855                              struct mgs_send_param *msp)
856 {
857         struct ptlrpc_request *req;
858         struct mgs_send_param *req_msp, *rep_msp;
859         int rc;
860
861         req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
862                                         &RQF_MGS_SET_INFO, LUSTRE_MGS_VERSION,
863                                         MGS_SET_INFO);
864         if (!req)
865                 return -ENOMEM;
866
867         req_msp = req_capsule_client_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
868         if (!req_msp) {
869                 ptlrpc_req_finished(req);
870                 return -ENOMEM;
871         }
872
873         memcpy(req_msp, msp, sizeof(*req_msp));
874         ptlrpc_request_set_replen(req);
875
876         /* Limit how long we will wait for the enqueue to complete */
877         req->rq_delay_limit = MGC_SEND_PARAM_LIMIT;
878         rc = ptlrpc_queue_wait(req);
879         if (!rc) {
880                 rep_msp = req_capsule_server_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
881                 memcpy(msp, rep_msp, sizeof(*rep_msp));
882         }
883
884         ptlrpc_req_finished(req);
885
886         return rc;
887 }
888
889 /* Take a config lock so we can get cancel notifications */
890 static int mgc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
891                        __u32 type, ldlm_policy_data_t *policy, __u32 mode,
892                        __u64 *flags, void *bl_cb, void *cp_cb, void *gl_cb,
893                        void *data, __u32 lvb_len, void *lvb_swabber,
894                        struct lustre_handle *lockh)
895 {
896         struct config_llog_data *cld = data;
897         struct ldlm_enqueue_info einfo = {
898                 .ei_type        = type,
899                 .ei_mode        = mode,
900                 .ei_cb_bl       = mgc_blocking_ast,
901                 .ei_cb_cp       = ldlm_completion_ast,
902         };
903         struct ptlrpc_request *req;
904         int short_limit = cld_is_sptlrpc(cld);
905         int rc;
906
907         CDEBUG(D_MGC, "Enqueue for %s (res %#llx)\n", cld->cld_logname,
908                cld->cld_resid.name[0]);
909
910         /* We need a callback for every lockholder, so don't try to
911          * ldlm_lock_match (see rev 1.1.2.11.2.47)
912          */
913         req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
914                                         &RQF_LDLM_ENQUEUE, LUSTRE_DLM_VERSION,
915                                         LDLM_ENQUEUE);
916         if (!req)
917                 return -ENOMEM;
918
919         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, 0);
920         ptlrpc_request_set_replen(req);
921
922         /* Limit how long we will wait for the enqueue to complete */
923         req->rq_delay_limit = short_limit ? 5 : MGC_ENQUEUE_LIMIT;
924         rc = ldlm_cli_enqueue(exp, &req, &einfo, &cld->cld_resid, NULL, flags,
925                               NULL, 0, LVB_T_NONE, lockh, 0);
926         /* A failed enqueue should still call the mgc_blocking_ast,
927          * where it will be requeued if needed ("grant failed").
928          */
929         ptlrpc_req_finished(req);
930         return rc;
931 }
932
933 static void mgc_notify_active(struct obd_device *unused)
934 {
935         /* wakeup mgc_requeue_thread to requeue mgc lock */
936         spin_lock(&config_list_lock);
937         rq_state |= RQ_NOW;
938         spin_unlock(&config_list_lock);
939         wake_up(&rq_waitq);
940
941         /* TODO: Help the MGS rebuild nidtbl. -jay */
942 }
943
944 /* Send target_reg message to MGS */
945 static int mgc_target_register(struct obd_export *exp,
946                                struct mgs_target_info *mti)
947 {
948         struct ptlrpc_request  *req;
949         struct mgs_target_info *req_mti, *rep_mti;
950         int                  rc;
951
952         req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
953                                         &RQF_MGS_TARGET_REG, LUSTRE_MGS_VERSION,
954                                         MGS_TARGET_REG);
955         if (!req)
956                 return -ENOMEM;
957
958         req_mti = req_capsule_client_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
959         if (!req_mti) {
960                 ptlrpc_req_finished(req);
961                 return -ENOMEM;
962         }
963
964         memcpy(req_mti, mti, sizeof(*req_mti));
965         ptlrpc_request_set_replen(req);
966         CDEBUG(D_MGC, "register %s\n", mti->mti_svname);
967         /* Limit how long we will wait for the enqueue to complete */
968         req->rq_delay_limit = MGC_TARGET_REG_LIMIT;
969
970         rc = ptlrpc_queue_wait(req);
971         if (!rc) {
972                 rep_mti = req_capsule_server_get(&req->rq_pill,
973                                                  &RMF_MGS_TARGET_INFO);
974                 memcpy(mti, rep_mti, sizeof(*rep_mti));
975                 CDEBUG(D_MGC, "register %s got index = %d\n",
976                        mti->mti_svname, mti->mti_stripe_index);
977         }
978         ptlrpc_req_finished(req);
979
980         return rc;
981 }
982
983 static int mgc_set_info_async(const struct lu_env *env, struct obd_export *exp,
984                               u32 keylen, void *key, u32 vallen,
985                               void *val, struct ptlrpc_request_set *set)
986 {
987         int rc = -EINVAL;
988
989         /* Turn off initial_recov after we try all backup servers once */
990         if (KEY_IS(KEY_INIT_RECOV_BACKUP)) {
991                 struct obd_import *imp = class_exp2cliimp(exp);
992                 int value;
993
994                 if (vallen != sizeof(int))
995                         return -EINVAL;
996                 value = *(int *)val;
997                 CDEBUG(D_MGC, "InitRecov %s %d/d%d:i%d:r%d:or%d:%s\n",
998                        imp->imp_obd->obd_name, value,
999                        imp->imp_deactive, imp->imp_invalid,
1000                        imp->imp_replayable, imp->imp_obd->obd_replayable,
1001                        ptlrpc_import_state_name(imp->imp_state));
1002                 /* Resurrect if we previously died */
1003                 if ((imp->imp_state != LUSTRE_IMP_FULL &&
1004                      imp->imp_state != LUSTRE_IMP_NEW) || value > 1)
1005                         ptlrpc_reconnect_import(imp);
1006                 return 0;
1007         }
1008         if (KEY_IS(KEY_SET_INFO)) {
1009                 struct mgs_send_param *msp;
1010
1011                 msp = val;
1012                 rc =  mgc_set_mgs_param(exp, msp);
1013                 return rc;
1014         }
1015         if (KEY_IS(KEY_MGSSEC)) {
1016                 struct client_obd     *cli = &exp->exp_obd->u.cli;
1017                 struct sptlrpc_flavor  flvr;
1018
1019                 /*
1020                  * empty string means using current flavor, if which haven't
1021                  * been set yet, set it as null.
1022                  *
1023                  * if flavor has been set previously, check the asking flavor
1024                  * must match the existing one.
1025                  */
1026                 if (vallen == 0) {
1027                         if (cli->cl_flvr_mgc.sf_rpc != SPTLRPC_FLVR_INVALID)
1028                                 return 0;
1029                         val = "null";
1030                         vallen = 4;
1031                 }
1032
1033                 rc = sptlrpc_parse_flavor(val, &flvr);
1034                 if (rc) {
1035                         CERROR("invalid sptlrpc flavor %s to MGS\n",
1036                                (char *)val);
1037                         return rc;
1038                 }
1039
1040                 /*
1041                  * caller already hold a mutex
1042                  */
1043                 if (cli->cl_flvr_mgc.sf_rpc == SPTLRPC_FLVR_INVALID) {
1044                         cli->cl_flvr_mgc = flvr;
1045                 } else if (memcmp(&cli->cl_flvr_mgc, &flvr,
1046                                   sizeof(flvr)) != 0) {
1047                         char    str[20];
1048
1049                         sptlrpc_flavor2name(&cli->cl_flvr_mgc,
1050                                             str, sizeof(str));
1051                         LCONSOLE_ERROR("asking sptlrpc flavor %s to MGS but currently %s is in use\n",
1052                                        (char *)val, str);
1053                         rc = -EPERM;
1054                 }
1055                 return rc;
1056         }
1057
1058         return rc;
1059 }
1060
1061 static int mgc_get_info(const struct lu_env *env, struct obd_export *exp,
1062                         __u32 keylen, void *key, __u32 *vallen, void *val,
1063                         struct lov_stripe_md *unused)
1064 {
1065         int rc = -EINVAL;
1066
1067         if (KEY_IS(KEY_CONN_DATA)) {
1068                 struct obd_import *imp = class_exp2cliimp(exp);
1069                 struct obd_connect_data *data = val;
1070
1071                 if (*vallen == sizeof(*data)) {
1072                         *data = imp->imp_connect_data;
1073                         rc = 0;
1074                 }
1075         }
1076
1077         return rc;
1078 }
1079
1080 static int mgc_import_event(struct obd_device *obd,
1081                             struct obd_import *imp,
1082                             enum obd_import_event event)
1083 {
1084         LASSERT(imp->imp_obd == obd);
1085         CDEBUG(D_MGC, "import event %#x\n", event);
1086
1087         switch (event) {
1088         case IMP_EVENT_DISCON:
1089                 /* MGC imports should not wait for recovery */
1090                 if (OCD_HAS_FLAG(&imp->imp_connect_data, IMP_RECOV))
1091                         ptlrpc_pinger_ir_down();
1092                 break;
1093         case IMP_EVENT_INACTIVE:
1094                 break;
1095         case IMP_EVENT_INVALIDATE: {
1096                 struct ldlm_namespace *ns = obd->obd_namespace;
1097
1098                 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
1099                 break;
1100         }
1101         case IMP_EVENT_ACTIVE:
1102                 CDEBUG(D_INFO, "%s: Reactivating import\n", obd->obd_name);
1103                 /* Clearing obd_no_recov allows us to continue pinging */
1104                 obd->obd_no_recov = 0;
1105                 mgc_notify_active(obd);
1106                 if (OCD_HAS_FLAG(&imp->imp_connect_data, IMP_RECOV))
1107                         ptlrpc_pinger_ir_up();
1108                 break;
1109         case IMP_EVENT_OCD:
1110                 break;
1111         case IMP_EVENT_DEACTIVATE:
1112         case IMP_EVENT_ACTIVATE:
1113                 break;
1114         default:
1115                 CERROR("Unknown import event %#x\n", event);
1116                 LBUG();
1117         }
1118         return 0;
1119 }
1120
1121 enum {
1122         CONFIG_READ_NRPAGES_INIT = 1 << (20 - PAGE_SHIFT),
1123         CONFIG_READ_NRPAGES      = 4
1124 };
1125
1126 static int mgc_apply_recover_logs(struct obd_device *mgc,
1127                                   struct config_llog_data *cld,
1128                                   __u64 max_version,
1129                                   void *data, int datalen, bool mne_swab)
1130 {
1131         struct config_llog_instance *cfg = &cld->cld_cfg;
1132         struct mgs_nidtbl_entry *entry;
1133         struct lustre_cfg       *lcfg;
1134         struct lustre_cfg_bufs   bufs;
1135         u64   prev_version = 0;
1136         char *inst;
1137         char *buf;
1138         int   bufsz;
1139         int   pos;
1140         int   rc  = 0;
1141         int   off = 0;
1142
1143         LASSERT(cfg->cfg_instance);
1144         LASSERT(cfg->cfg_sb == cfg->cfg_instance);
1145
1146         inst = kzalloc(PAGE_SIZE, GFP_KERNEL);
1147         if (!inst)
1148                 return -ENOMEM;
1149
1150         pos = snprintf(inst, PAGE_SIZE, "%p", cfg->cfg_instance);
1151         if (pos >= PAGE_SIZE) {
1152                 kfree(inst);
1153                 return -E2BIG;
1154         }
1155
1156         ++pos;
1157         buf   = inst + pos;
1158         bufsz = PAGE_SIZE - pos;
1159
1160         while (datalen > 0) {
1161                 int   entry_len = sizeof(*entry);
1162                 int is_ost, i;
1163                 struct obd_device *obd;
1164                 char *obdname;
1165                 char *cname;
1166                 char *params;
1167                 char *uuid;
1168
1169                 rc = -EINVAL;
1170                 if (datalen < sizeof(*entry))
1171                         break;
1172
1173                 entry = (typeof(entry))(data + off);
1174
1175                 /* sanity check */
1176                 if (entry->mne_nid_type != 0) /* only support type 0 for ipv4 */
1177                         break;
1178                 if (entry->mne_nid_count == 0) /* at least one nid entry */
1179                         break;
1180                 if (entry->mne_nid_size != sizeof(lnet_nid_t))
1181                         break;
1182
1183                 entry_len += entry->mne_nid_count * entry->mne_nid_size;
1184                 if (datalen < entry_len) /* must have entry_len at least */
1185                         break;
1186
1187                 /* Keep this swab for normal mixed endian handling. LU-1644 */
1188                 if (mne_swab)
1189                         lustre_swab_mgs_nidtbl_entry(entry);
1190                 if (entry->mne_length > PAGE_SIZE) {
1191                         CERROR("MNE too large (%u)\n", entry->mne_length);
1192                         break;
1193                 }
1194
1195                 if (entry->mne_length < entry_len)
1196                         break;
1197
1198                 off     += entry->mne_length;
1199                 datalen -= entry->mne_length;
1200                 if (datalen < 0)
1201                         break;
1202
1203                 if (entry->mne_version > max_version) {
1204                         CERROR("entry index(%lld) is over max_index(%lld)\n",
1205                                entry->mne_version, max_version);
1206                         break;
1207                 }
1208
1209                 if (prev_version >= entry->mne_version) {
1210                         CERROR("index unsorted, prev %lld, now %lld\n",
1211                                prev_version, entry->mne_version);
1212                         break;
1213                 }
1214                 prev_version = entry->mne_version;
1215
1216                 /*
1217                  * Write a string with format "nid::instance" to
1218                  * lustre/<osc|mdc>/<target>-<osc|mdc>-<instance>/import.
1219                  */
1220
1221                 is_ost = entry->mne_type == LDD_F_SV_TYPE_OST;
1222                 memset(buf, 0, bufsz);
1223                 obdname = buf;
1224                 pos = 0;
1225
1226                 /* lustre-OST0001-osc-<instance #> */
1227                 strcpy(obdname, cld->cld_logname);
1228                 cname = strrchr(obdname, '-');
1229                 if (!cname) {
1230                         CERROR("mgc %s: invalid logname %s\n",
1231                                mgc->obd_name, obdname);
1232                         break;
1233                 }
1234
1235                 pos = cname - obdname;
1236                 obdname[pos] = 0;
1237                 pos += sprintf(obdname + pos, "-%s%04x",
1238                                   is_ost ? "OST" : "MDT", entry->mne_index);
1239
1240                 cname = is_ost ? "osc" : "mdc";
1241                 pos += sprintf(obdname + pos, "-%s-%s", cname, inst);
1242                 lustre_cfg_bufs_reset(&bufs, obdname);
1243
1244                 /* find the obd by obdname */
1245                 obd = class_name2obd(obdname);
1246                 if (!obd) {
1247                         CDEBUG(D_INFO, "mgc %s: cannot find obdname %s\n",
1248                                mgc->obd_name, obdname);
1249                         rc = 0;
1250                         /* this is a safe race, when the ost is starting up...*/
1251                         continue;
1252                 }
1253
1254                 /* osc.import = "connection=<Conn UUID>::<target instance>" */
1255                 ++pos;
1256                 params = buf + pos;
1257                 pos += sprintf(params, "%s.import=%s", cname, "connection=");
1258                 uuid = buf + pos;
1259
1260                 down_read(&obd->u.cli.cl_sem);
1261                 if (!obd->u.cli.cl_import) {
1262                         /* client does not connect to the OST yet */
1263                         up_read(&obd->u.cli.cl_sem);
1264                         rc = 0;
1265                         continue;
1266                 }
1267
1268                 /* iterate all nids to find one */
1269                 /* find uuid by nid */
1270                 rc = -ENOENT;
1271                 for (i = 0; i < entry->mne_nid_count; i++) {
1272                         rc = client_import_find_conn(obd->u.cli.cl_import,
1273                                                      entry->u.nids[0],
1274                                                      (struct obd_uuid *)uuid);
1275                         if (!rc)
1276                                 break;
1277                 }
1278
1279                 up_read(&obd->u.cli.cl_sem);
1280                 if (rc < 0) {
1281                         CERROR("mgc: cannot find uuid by nid %s\n",
1282                                libcfs_nid2str(entry->u.nids[0]));
1283                         break;
1284                 }
1285
1286                 CDEBUG(D_INFO, "Find uuid %s by nid %s\n",
1287                        uuid, libcfs_nid2str(entry->u.nids[0]));
1288
1289                 pos += strlen(uuid);
1290                 pos += sprintf(buf + pos, "::%u", entry->mne_instance);
1291                 LASSERT(pos < bufsz);
1292
1293                 lustre_cfg_bufs_set_string(&bufs, 1, params);
1294
1295                 rc = -ENOMEM;
1296                 lcfg = lustre_cfg_new(LCFG_PARAM, &bufs);
1297                 if (IS_ERR(lcfg)) {
1298                         CERROR("mgc: cannot allocate memory\n");
1299                         break;
1300                 }
1301
1302                 CDEBUG(D_INFO, "ir apply logs %lld/%lld for %s -> %s\n",
1303                        prev_version, max_version, obdname, params);
1304
1305                 rc = class_process_config(lcfg);
1306                 lustre_cfg_free(lcfg);
1307                 if (rc)
1308                         CDEBUG(D_INFO, "process config for %s error %d\n",
1309                                obdname, rc);
1310
1311                 /* continue, even one with error */
1312         }
1313
1314         kfree(inst);
1315         return rc;
1316 }
1317
1318 /**
1319  * This function is called if this client was notified for target restarting
1320  * by the MGS. A CONFIG_READ RPC is going to send to fetch recovery logs.
1321  */
1322 static int mgc_process_recover_log(struct obd_device *obd,
1323                                    struct config_llog_data *cld)
1324 {
1325         struct ptlrpc_request *req = NULL;
1326         struct config_llog_instance *cfg = &cld->cld_cfg;
1327         struct mgs_config_body *body;
1328         struct mgs_config_res  *res;
1329         struct ptlrpc_bulk_desc *desc;
1330         struct page **pages;
1331         int nrpages;
1332         bool eof = true;
1333         bool mne_swab;
1334         int i;
1335         int ealen;
1336         int rc;
1337
1338         /* allocate buffer for bulk transfer.
1339          * if this is the first time for this mgs to read logs,
1340          * CONFIG_READ_NRPAGES_INIT will be used since it will read all logs
1341          * once; otherwise, it only reads increment of logs, this should be
1342          * small and CONFIG_READ_NRPAGES will be used.
1343          */
1344         nrpages = CONFIG_READ_NRPAGES;
1345         if (cfg->cfg_last_idx == 0) /* the first time */
1346                 nrpages = CONFIG_READ_NRPAGES_INIT;
1347
1348         pages = kcalloc(nrpages, sizeof(*pages), GFP_KERNEL);
1349         if (!pages) {
1350                 rc = -ENOMEM;
1351                 goto out;
1352         }
1353
1354         for (i = 0; i < nrpages; i++) {
1355                 pages[i] = alloc_page(GFP_KERNEL);
1356                 if (!pages[i]) {
1357                         rc = -ENOMEM;
1358                         goto out;
1359                 }
1360         }
1361
1362 again:
1363         LASSERT(cld_is_recover(cld));
1364         LASSERT(mutex_is_locked(&cld->cld_lock));
1365         req = ptlrpc_request_alloc(class_exp2cliimp(cld->cld_mgcexp),
1366                                    &RQF_MGS_CONFIG_READ);
1367         if (!req) {
1368                 rc = -ENOMEM;
1369                 goto out;
1370         }
1371
1372         rc = ptlrpc_request_pack(req, LUSTRE_MGS_VERSION, MGS_CONFIG_READ);
1373         if (rc)
1374                 goto out;
1375
1376         /* pack request */
1377         body = req_capsule_client_get(&req->rq_pill, &RMF_MGS_CONFIG_BODY);
1378         LASSERT(sizeof(body->mcb_name) > strlen(cld->cld_logname));
1379         if (strlcpy(body->mcb_name, cld->cld_logname, sizeof(body->mcb_name))
1380             >= sizeof(body->mcb_name)) {
1381                 rc = -E2BIG;
1382                 goto out;
1383         }
1384         body->mcb_offset = cfg->cfg_last_idx + 1;
1385         body->mcb_type   = cld->cld_type;
1386         body->mcb_bits   = PAGE_SHIFT;
1387         body->mcb_units  = nrpages;
1388
1389         /* allocate bulk transfer descriptor */
1390         desc = ptlrpc_prep_bulk_imp(req, nrpages, 1, BULK_PUT_SINK,
1391                                     MGS_BULK_PORTAL);
1392         if (!desc) {
1393                 rc = -ENOMEM;
1394                 goto out;
1395         }
1396
1397         for (i = 0; i < nrpages; i++)
1398                 ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_SIZE);
1399
1400         ptlrpc_request_set_replen(req);
1401         rc = ptlrpc_queue_wait(req);
1402         if (rc)
1403                 goto out;
1404
1405         res = req_capsule_server_get(&req->rq_pill, &RMF_MGS_CONFIG_RES);
1406         if (res->mcr_size < res->mcr_offset) {
1407                 rc = -EINVAL;
1408                 goto out;
1409         }
1410
1411         /* always update the index even though it might have errors with
1412          * handling the recover logs
1413          */
1414         cfg->cfg_last_idx = res->mcr_offset;
1415         eof = res->mcr_offset == res->mcr_size;
1416
1417         CDEBUG(D_INFO, "Latest version %lld, more %d.\n",
1418                res->mcr_offset, eof == false);
1419
1420         ealen = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, 0);
1421         if (ealen < 0) {
1422                 rc = ealen;
1423                 goto out;
1424         }
1425
1426         if (ealen > nrpages << PAGE_SHIFT) {
1427                 rc = -EINVAL;
1428                 goto out;
1429         }
1430
1431         if (ealen == 0) { /* no logs transferred */
1432                 if (!eof)
1433                         rc = -EINVAL;
1434                 goto out;
1435         }
1436
1437         mne_swab = !!ptlrpc_rep_need_swab(req);
1438 #if OBD_OCD_VERSION(3, 0, 53, 0) > LUSTRE_VERSION_CODE
1439         /* This import flag means the server did an extra swab of IR MNE
1440          * records (fixed in LU-1252), reverse it here if needed. LU-1644
1441          */
1442         if (unlikely(req->rq_import->imp_need_mne_swab))
1443                 mne_swab = !mne_swab;
1444 #endif
1445
1446         for (i = 0; i < nrpages && ealen > 0; i++) {
1447                 int rc2;
1448                 void *ptr;
1449
1450                 ptr = kmap(pages[i]);
1451                 rc2 = mgc_apply_recover_logs(obd, cld, res->mcr_offset, ptr,
1452                                              min_t(int, ealen, PAGE_SIZE),
1453                                              mne_swab);
1454                 kunmap(pages[i]);
1455                 if (rc2 < 0) {
1456                         CWARN("Process recover log %s error %d\n",
1457                               cld->cld_logname, rc2);
1458                         break;
1459                 }
1460
1461                 ealen -= PAGE_SIZE;
1462         }
1463
1464 out:
1465         if (req)
1466                 ptlrpc_req_finished(req);
1467
1468         if (rc == 0 && !eof)
1469                 goto again;
1470
1471         if (pages) {
1472                 for (i = 0; i < nrpages; i++) {
1473                         if (!pages[i])
1474                                 break;
1475                         __free_page(pages[i]);
1476                 }
1477                 kfree(pages);
1478         }
1479         return rc;
1480 }
1481
1482 /* local_only means it cannot get remote llogs */
1483 static int mgc_process_cfg_log(struct obd_device *mgc,
1484                                struct config_llog_data *cld, int local_only)
1485 {
1486         struct llog_ctxt        *ctxt;
1487         struct lustre_sb_info   *lsi = NULL;
1488         int                      rc = 0;
1489         bool                     sptlrpc_started = false;
1490         struct lu_env           *env;
1491
1492         LASSERT(cld);
1493         LASSERT(mutex_is_locked(&cld->cld_lock));
1494
1495         /*
1496          * local copy of sptlrpc log is controlled elsewhere, don't try to
1497          * read it up here.
1498          */
1499         if (cld_is_sptlrpc(cld) && local_only)
1500                 return 0;
1501
1502         if (cld->cld_cfg.cfg_sb)
1503                 lsi = s2lsi(cld->cld_cfg.cfg_sb);
1504
1505         env = kzalloc(sizeof(*env), GFP_KERNEL);
1506         if (!env)
1507                 return -ENOMEM;
1508
1509         rc = lu_env_init(env, LCT_MG_THREAD);
1510         if (rc)
1511                 goto out_free;
1512
1513         ctxt = llog_get_context(mgc, LLOG_CONFIG_REPL_CTXT);
1514         LASSERT(ctxt);
1515
1516         if (local_only) /* no local log at client side */ {
1517                 rc = -EIO;
1518                 goto out_pop;
1519         }
1520
1521         if (cld_is_sptlrpc(cld)) {
1522                 sptlrpc_conf_log_update_begin(cld->cld_logname);
1523                 sptlrpc_started = true;
1524         }
1525
1526         /* logname and instance info should be the same, so use our
1527          * copy of the instance for the update.  The cfg_last_idx will
1528          * be updated here.
1529          */
1530         rc = class_config_parse_llog(env, ctxt, cld->cld_logname,
1531                                      &cld->cld_cfg);
1532
1533 out_pop:
1534         __llog_ctxt_put(env, ctxt);
1535
1536         /*
1537          * update settings on existing OBDs. doing it inside
1538          * of llog_process_lock so no device is attaching/detaching
1539          * in parallel.
1540          * the logname must be <fsname>-sptlrpc
1541          */
1542         if (sptlrpc_started) {
1543                 LASSERT(cld_is_sptlrpc(cld));
1544                 sptlrpc_conf_log_update_end(cld->cld_logname);
1545                 class_notify_sptlrpc_conf(cld->cld_logname,
1546                                           strlen(cld->cld_logname) -
1547                                           strlen("-sptlrpc"));
1548         }
1549
1550         lu_env_fini(env);
1551 out_free:
1552         kfree(env);
1553         return rc;
1554 }
1555
1556 /** Get a config log from the MGS and process it.
1557  * This func is called for both clients and servers.
1558  * Copy the log locally before parsing it if appropriate (non-MGS server)
1559  */
1560 int mgc_process_log(struct obd_device *mgc, struct config_llog_data *cld)
1561 {
1562         struct lustre_handle lockh = { 0 };
1563         __u64 flags = LDLM_FL_NO_LRU;
1564         int rc = 0, rcl;
1565
1566         LASSERT(cld);
1567
1568         /* I don't want multiple processes running process_log at once --
1569          * sounds like badness.  It actually might be fine, as long as
1570          * we're not trying to update from the same log
1571          * simultaneously (in which case we should use a per-log sem.)
1572          */
1573         mutex_lock(&cld->cld_lock);
1574         if (cld->cld_stopping) {
1575                 mutex_unlock(&cld->cld_lock);
1576                 return 0;
1577         }
1578
1579         OBD_FAIL_TIMEOUT(OBD_FAIL_MGC_PAUSE_PROCESS_LOG, 20);
1580
1581         CDEBUG(D_MGC, "Process log %s:%p from %d\n", cld->cld_logname,
1582                cld->cld_cfg.cfg_instance, cld->cld_cfg.cfg_last_idx + 1);
1583
1584         /* Get the cfg lock on the llog */
1585         rcl = mgc_enqueue(mgc->u.cli.cl_mgc_mgsexp, NULL, LDLM_PLAIN, NULL,
1586                           LCK_CR, &flags, NULL, NULL, NULL,
1587                           cld, 0, NULL, &lockh);
1588         if (rcl == 0) {
1589                 /* Get the cld, it will be released in mgc_blocking_ast. */
1590                 config_log_get(cld);
1591                 rc = ldlm_lock_set_data(&lockh, (void *)cld);
1592                 LASSERT(rc == 0);
1593         } else {
1594                 CDEBUG(D_MGC, "Can't get cfg lock: %d\n", rcl);
1595
1596                 /* mark cld_lostlock so that it will requeue
1597                  * after MGC becomes available.
1598                  */
1599                 cld->cld_lostlock = 1;
1600                 /* Get extra reference, it will be put in requeue thread */
1601                 config_log_get(cld);
1602         }
1603
1604         if (cld_is_recover(cld)) {
1605                 rc = 0; /* this is not a fatal error for recover log */
1606                 if (rcl == 0)
1607                         rc = mgc_process_recover_log(mgc, cld);
1608         } else {
1609                 rc = mgc_process_cfg_log(mgc, cld, rcl != 0);
1610         }
1611
1612         CDEBUG(D_MGC, "%s: configuration from log '%s' %sed (%d).\n",
1613                mgc->obd_name, cld->cld_logname, rc ? "fail" : "succeed", rc);
1614
1615         mutex_unlock(&cld->cld_lock);
1616
1617         /* Now drop the lock so MGS can revoke it */
1618         if (!rcl)
1619                 ldlm_lock_decref(&lockh, LCK_CR);
1620
1621         return rc;
1622 }
1623
1624 /** Called from lustre_process_log.
1625  * LCFG_LOG_START gets the config log from the MGS, processes it to start
1626  * any services, and adds it to the list logs to watch (follow).
1627  */
1628 static int mgc_process_config(struct obd_device *obd, u32 len, void *buf)
1629 {
1630         struct lustre_cfg *lcfg = buf;
1631         struct config_llog_instance *cfg = NULL;
1632         char *logname;
1633         int rc = 0;
1634
1635         switch (lcfg->lcfg_command) {
1636         case LCFG_LOV_ADD_OBD: {
1637                 /* Overloading this cfg command: register a new target */
1638                 struct mgs_target_info *mti;
1639
1640                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) !=
1641                     sizeof(struct mgs_target_info)) {
1642                         rc = -EINVAL;
1643                         goto out;
1644                 }
1645
1646                 mti = (struct mgs_target_info *)lustre_cfg_buf(lcfg, 1);
1647                 CDEBUG(D_MGC, "add_target %s %#x\n",
1648                        mti->mti_svname, mti->mti_flags);
1649                 rc = mgc_target_register(obd->u.cli.cl_mgc_mgsexp, mti);
1650                 break;
1651         }
1652         case LCFG_LOV_DEL_OBD:
1653                 /* Unregister has no meaning at the moment. */
1654                 CERROR("lov_del_obd unimplemented\n");
1655                 rc = -ENOSYS;
1656                 break;
1657         case LCFG_SPTLRPC_CONF: {
1658                 rc = sptlrpc_process_config(lcfg);
1659                 break;
1660         }
1661         case LCFG_LOG_START: {
1662                 struct config_llog_data *cld;
1663                 struct super_block *sb;
1664
1665                 logname = lustre_cfg_string(lcfg, 1);
1666                 cfg = (struct config_llog_instance *)lustre_cfg_buf(lcfg, 2);
1667                 sb = *(struct super_block **)lustre_cfg_buf(lcfg, 3);
1668
1669                 CDEBUG(D_MGC, "parse_log %s from %d\n", logname,
1670                        cfg->cfg_last_idx);
1671
1672                 /* We're only called through here on the initial mount */
1673                 rc = config_log_add(obd, logname, cfg, sb);
1674                 if (rc)
1675                         break;
1676                 cld = config_log_find(logname, cfg);
1677                 if (!cld) {
1678                         rc = -ENOENT;
1679                         break;
1680                 }
1681
1682                 /* COMPAT_146 */
1683                 /* FIXME only set this for old logs!  Right now this forces
1684                  * us to always skip the "inside markers" check
1685                  */
1686                 cld->cld_cfg.cfg_flags |= CFG_F_COMPAT146;
1687
1688                 rc = mgc_process_log(obd, cld);
1689                 if (rc == 0 && cld->cld_recover) {
1690                         if (OCD_HAS_FLAG(&obd->u.cli.cl_import->
1691                                          imp_connect_data, IMP_RECOV)) {
1692                                 rc = mgc_process_log(obd, cld->cld_recover);
1693                         } else {
1694                                 struct config_llog_data *cir = cld->cld_recover;
1695
1696                                 cld->cld_recover = NULL;
1697                                 config_log_put(cir);
1698                         }
1699                         if (rc)
1700                                 CERROR("Cannot process recover llog %d\n", rc);
1701                 }
1702
1703                 if (rc == 0 && cld->cld_params) {
1704                         rc = mgc_process_log(obd, cld->cld_params);
1705                         if (rc == -ENOENT) {
1706                                 CDEBUG(D_MGC,
1707                                        "There is no params config file yet\n");
1708                                 rc = 0;
1709                         }
1710                         /* params log is optional */
1711                         if (rc)
1712                                 CERROR(
1713                                        "%s: can't process params llog: rc = %d\n",
1714                                        obd->obd_name, rc);
1715                 }
1716                 config_log_put(cld);
1717
1718                 break;
1719         }
1720         case LCFG_LOG_END: {
1721                 logname = lustre_cfg_string(lcfg, 1);
1722
1723                 if (lcfg->lcfg_bufcount >= 2)
1724                         cfg = (struct config_llog_instance *)lustre_cfg_buf(
1725                                 lcfg, 2);
1726                 rc = config_log_end(logname, cfg);
1727                 break;
1728         }
1729         default: {
1730                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1731                 rc = -EINVAL;
1732                 goto out;
1733         }
1734         }
1735 out:
1736         return rc;
1737 }
1738
1739 static struct obd_ops mgc_obd_ops = {
1740         .owner          = THIS_MODULE,
1741         .setup          = mgc_setup,
1742         .precleanup     = mgc_precleanup,
1743         .cleanup        = mgc_cleanup,
1744         .add_conn       = client_import_add_conn,
1745         .del_conn       = client_import_del_conn,
1746         .connect        = client_connect_import,
1747         .disconnect     = client_disconnect_export,
1748         .set_info_async = mgc_set_info_async,
1749         .get_info       = mgc_get_info,
1750         .import_event   = mgc_import_event,
1751         .process_config = mgc_process_config,
1752 };
1753
1754 static int __init mgc_init(void)
1755 {
1756         return class_register_type(&mgc_obd_ops, NULL,
1757                                    LUSTRE_MGC_NAME, NULL);
1758 }
1759
1760 static void /*__exit*/ mgc_exit(void)
1761 {
1762         class_unregister_type(LUSTRE_MGC_NAME);
1763 }
1764
1765 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
1766 MODULE_DESCRIPTION("Lustre Management Client");
1767 MODULE_VERSION(LUSTRE_VERSION_STRING);
1768 MODULE_LICENSE("GPL");
1769
1770 module_init(mgc_init);
1771 module_exit(mgc_exit);