GNU Linux-libre 4.9.330-gnu1
[releases.git] / drivers / staging / lustre / lustre / fid / fid_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/fid/fid_request.c
33  *
34  * Lustre Sequence Manager
35  *
36  * Author: Yury Umanets <umka@clusterfs.com>
37  */
38
39 #define DEBUG_SUBSYSTEM S_FID
40
41 #include "../../include/linux/libcfs/libcfs.h"
42 #include <linux/module.h>
43
44 #include "../include/obd.h"
45 #include "../include/obd_class.h"
46 #include "../include/obd_support.h"
47 #include "../include/lustre_fid.h"
48 /* mdc RPC locks */
49 #include "../include/lustre_mdc.h"
50 #include "fid_internal.h"
51
52 static struct dentry *seq_debugfs_dir;
53
54 static int seq_client_rpc(struct lu_client_seq *seq,
55                           struct lu_seq_range *output, __u32 opc,
56                           const char *opcname)
57 {
58         struct obd_export     *exp = seq->lcs_exp;
59         struct ptlrpc_request *req;
60         struct lu_seq_range   *out, *in;
61         __u32                 *op;
62         unsigned int           debug_mask;
63         int                    rc;
64
65         LASSERT(exp && !IS_ERR(exp));
66         req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_SEQ_QUERY,
67                                         LUSTRE_MDS_VERSION, SEQ_QUERY);
68         if (!req)
69                 return -ENOMEM;
70
71         /* Init operation code */
72         op = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_OPC);
73         *op = opc;
74
75         /* Zero out input range, this is not recovery yet. */
76         in = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_RANGE);
77         range_init(in);
78
79         ptlrpc_request_set_replen(req);
80
81         in->lsr_index = seq->lcs_space.lsr_index;
82         if (seq->lcs_type == LUSTRE_SEQ_METADATA)
83                 fld_range_set_mdt(in);
84         else
85                 fld_range_set_ost(in);
86
87         if (opc == SEQ_ALLOC_SUPER) {
88                 req->rq_request_portal = SEQ_CONTROLLER_PORTAL;
89                 req->rq_reply_portal = MDC_REPLY_PORTAL;
90                 /* During allocating super sequence for data object,
91                  * the current thread might hold the export of MDT0(MDT0
92                  * precreating objects on this OST), and it will send the
93                  * request to MDT0 here, so we can not keep resending the
94                  * request here, otherwise if MDT0 is failed(umounted),
95                  * it can not release the export of MDT0
96                  */
97                 if (seq->lcs_type == LUSTRE_SEQ_DATA) {
98                         req->rq_no_delay = 1;
99                         req->rq_no_resend = 1;
100                 }
101                 debug_mask = D_CONSOLE;
102         } else {
103                 if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
104                         req->rq_reply_portal = MDC_REPLY_PORTAL;
105                         req->rq_request_portal = SEQ_METADATA_PORTAL;
106                 } else {
107                         req->rq_reply_portal = OSC_REPLY_PORTAL;
108                         req->rq_request_portal = SEQ_DATA_PORTAL;
109                 }
110                 debug_mask = D_INFO;
111         }
112
113         ptlrpc_at_set_req_timeout(req);
114
115         if (opc != SEQ_ALLOC_SUPER && seq->lcs_type == LUSTRE_SEQ_METADATA)
116                 mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
117         rc = ptlrpc_queue_wait(req);
118         if (opc != SEQ_ALLOC_SUPER && seq->lcs_type == LUSTRE_SEQ_METADATA)
119                 mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
120         if (rc)
121                 goto out_req;
122
123         out = req_capsule_server_get(&req->rq_pill, &RMF_SEQ_RANGE);
124         *output = *out;
125
126         if (!range_is_sane(output)) {
127                 CERROR("%s: Invalid range received from server: "
128                        DRANGE "\n", seq->lcs_name, PRANGE(output));
129                 rc = -EINVAL;
130                 goto out_req;
131         }
132
133         if (range_is_exhausted(output)) {
134                 CERROR("%s: Range received from server is exhausted: "
135                        DRANGE "]\n", seq->lcs_name, PRANGE(output));
136                 rc = -EINVAL;
137                 goto out_req;
138         }
139
140         CDEBUG_LIMIT(debug_mask, "%s: Allocated %s-sequence " DRANGE "]\n",
141                      seq->lcs_name, opcname, PRANGE(output));
142
143 out_req:
144         ptlrpc_req_finished(req);
145         return rc;
146 }
147
148 /* Request sequence-controller node to allocate new meta-sequence. */
149 static int seq_client_alloc_meta(const struct lu_env *env,
150                                  struct lu_client_seq *seq)
151 {
152         int rc;
153
154         do {
155                 /* If meta server return -EINPROGRESS or EAGAIN,
156                  * it means meta server might not be ready to
157                  * allocate super sequence from sequence controller
158                  * (MDT0)yet
159                  */
160                 rc = seq_client_rpc(seq, &seq->lcs_space,
161                                     SEQ_ALLOC_META, "meta");
162         } while (rc == -EINPROGRESS || rc == -EAGAIN);
163
164         return rc;
165 }
166
167 /* Allocate new sequence for client. */
168 static int seq_client_alloc_seq(const struct lu_env *env,
169                                 struct lu_client_seq *seq, u64 *seqnr)
170 {
171         int rc;
172
173         LASSERT(range_is_sane(&seq->lcs_space));
174
175         if (range_is_exhausted(&seq->lcs_space)) {
176                 rc = seq_client_alloc_meta(env, seq);
177                 if (rc) {
178                         CERROR("%s: Can't allocate new meta-sequence, rc %d\n",
179                                seq->lcs_name, rc);
180                         return rc;
181                 }
182                 CDEBUG(D_INFO, "%s: New range - " DRANGE "\n",
183                        seq->lcs_name, PRANGE(&seq->lcs_space));
184         } else {
185                 rc = 0;
186         }
187
188         LASSERT(!range_is_exhausted(&seq->lcs_space));
189         *seqnr = seq->lcs_space.lsr_start;
190         seq->lcs_space.lsr_start += 1;
191
192         CDEBUG(D_INFO, "%s: Allocated sequence [%#llx]\n", seq->lcs_name,
193                *seqnr);
194
195         return rc;
196 }
197
198 static int seq_fid_alloc_prep(struct lu_client_seq *seq,
199                               wait_queue_t *link)
200 {
201         if (seq->lcs_update) {
202                 add_wait_queue(&seq->lcs_waitq, link);
203                 set_current_state(TASK_UNINTERRUPTIBLE);
204                 mutex_unlock(&seq->lcs_mutex);
205
206                 schedule();
207
208                 mutex_lock(&seq->lcs_mutex);
209                 remove_wait_queue(&seq->lcs_waitq, link);
210                 set_current_state(TASK_RUNNING);
211                 return -EAGAIN;
212         }
213         ++seq->lcs_update;
214         mutex_unlock(&seq->lcs_mutex);
215         return 0;
216 }
217
218 static void seq_fid_alloc_fini(struct lu_client_seq *seq)
219 {
220         LASSERT(seq->lcs_update == 1);
221         mutex_lock(&seq->lcs_mutex);
222         --seq->lcs_update;
223         wake_up(&seq->lcs_waitq);
224 }
225
226 /* Allocate new fid on passed client @seq and save it to @fid. */
227 int seq_client_alloc_fid(const struct lu_env *env,
228                          struct lu_client_seq *seq, struct lu_fid *fid)
229 {
230         wait_queue_t link;
231         int rc;
232
233         LASSERT(seq);
234         LASSERT(fid);
235
236         init_waitqueue_entry(&link, current);
237         mutex_lock(&seq->lcs_mutex);
238
239         if (OBD_FAIL_CHECK(OBD_FAIL_SEQ_EXHAUST))
240                 seq->lcs_fid.f_oid = seq->lcs_width;
241
242         while (1) {
243                 u64 seqnr;
244
245                 if (!fid_is_zero(&seq->lcs_fid) &&
246                     fid_oid(&seq->lcs_fid) < seq->lcs_width) {
247                         /* Just bump last allocated fid and return to caller. */
248                         seq->lcs_fid.f_oid += 1;
249                         rc = 0;
250                         break;
251                 }
252
253                 rc = seq_fid_alloc_prep(seq, &link);
254                 if (rc)
255                         continue;
256
257                 rc = seq_client_alloc_seq(env, seq, &seqnr);
258                 if (rc) {
259                         CERROR("%s: Can't allocate new sequence, rc %d\n",
260                                seq->lcs_name, rc);
261                         seq_fid_alloc_fini(seq);
262                         mutex_unlock(&seq->lcs_mutex);
263                         return rc;
264                 }
265
266                 CDEBUG(D_INFO, "%s: Switch to sequence [0x%16.16Lx]\n",
267                        seq->lcs_name, seqnr);
268
269                 seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
270                 seq->lcs_fid.f_seq = seqnr;
271                 seq->lcs_fid.f_ver = 0;
272
273                 /*
274                  * Inform caller that sequence switch is performed to allow it
275                  * to setup FLD for it.
276                  */
277                 rc = 1;
278
279                 seq_fid_alloc_fini(seq);
280                 break;
281         }
282
283         *fid = seq->lcs_fid;
284         mutex_unlock(&seq->lcs_mutex);
285
286         CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name,  PFID(fid));
287         return rc;
288 }
289 EXPORT_SYMBOL(seq_client_alloc_fid);
290
291 /*
292  * Finish the current sequence due to disconnect.
293  * See mdc_import_event()
294  */
295 void seq_client_flush(struct lu_client_seq *seq)
296 {
297         wait_queue_t link;
298
299         LASSERT(seq);
300         init_waitqueue_entry(&link, current);
301         mutex_lock(&seq->lcs_mutex);
302
303         while (seq->lcs_update) {
304                 add_wait_queue(&seq->lcs_waitq, &link);
305                 set_current_state(TASK_UNINTERRUPTIBLE);
306                 mutex_unlock(&seq->lcs_mutex);
307
308                 schedule();
309
310                 mutex_lock(&seq->lcs_mutex);
311                 remove_wait_queue(&seq->lcs_waitq, &link);
312                 set_current_state(TASK_RUNNING);
313         }
314
315         fid_zero(&seq->lcs_fid);
316         /**
317          * this id shld not be used for seq range allocation.
318          * set to -1 for dgb check.
319          */
320
321         seq->lcs_space.lsr_index = -1;
322
323         range_init(&seq->lcs_space);
324         mutex_unlock(&seq->lcs_mutex);
325 }
326 EXPORT_SYMBOL(seq_client_flush);
327
328 static void seq_client_debugfs_fini(struct lu_client_seq *seq)
329 {
330         if (!IS_ERR_OR_NULL(seq->lcs_debugfs_entry))
331                 ldebugfs_remove(&seq->lcs_debugfs_entry);
332 }
333
334 static int seq_client_debugfs_init(struct lu_client_seq *seq)
335 {
336         int rc;
337
338         seq->lcs_debugfs_entry = ldebugfs_register(seq->lcs_name,
339                                                    seq_debugfs_dir,
340                                                    NULL, NULL);
341
342         if (IS_ERR_OR_NULL(seq->lcs_debugfs_entry)) {
343                 CERROR("%s: LdebugFS failed in seq-init\n", seq->lcs_name);
344                 rc = seq->lcs_debugfs_entry ? PTR_ERR(seq->lcs_debugfs_entry)
345                                             : -ENOMEM;
346                 seq->lcs_debugfs_entry = NULL;
347                 return rc;
348         }
349
350         rc = ldebugfs_add_vars(seq->lcs_debugfs_entry,
351                                seq_client_debugfs_list, seq);
352         if (rc) {
353                 CERROR("%s: Can't init sequence manager debugfs, rc %d\n",
354                        seq->lcs_name, rc);
355                 goto out_cleanup;
356         }
357
358         return 0;
359
360 out_cleanup:
361         seq_client_debugfs_fini(seq);
362         return rc;
363 }
364
365 static void seq_client_fini(struct lu_client_seq *seq)
366 {
367         seq_client_debugfs_fini(seq);
368
369         if (seq->lcs_exp) {
370                 class_export_put(seq->lcs_exp);
371                 seq->lcs_exp = NULL;
372         }
373 }
374
375 static int seq_client_init(struct lu_client_seq *seq,
376                            struct obd_export *exp,
377                            enum lu_cli_type type,
378                            const char *prefix)
379 {
380         int rc;
381
382         LASSERT(seq);
383         LASSERT(prefix);
384
385         seq->lcs_type = type;
386
387         mutex_init(&seq->lcs_mutex);
388         if (type == LUSTRE_SEQ_METADATA)
389                 seq->lcs_width = LUSTRE_METADATA_SEQ_MAX_WIDTH;
390         else
391                 seq->lcs_width = LUSTRE_DATA_SEQ_MAX_WIDTH;
392
393         init_waitqueue_head(&seq->lcs_waitq);
394         /* Make sure that things are clear before work is started. */
395         seq_client_flush(seq);
396
397         seq->lcs_exp = class_export_get(exp);
398
399         snprintf(seq->lcs_name, sizeof(seq->lcs_name),
400                  "cli-%s", prefix);
401
402         rc = seq_client_debugfs_init(seq);
403         if (rc)
404                 seq_client_fini(seq);
405         return rc;
406 }
407
408 int client_fid_init(struct obd_device *obd,
409                     struct obd_export *exp, enum lu_cli_type type)
410 {
411         struct client_obd *cli = &obd->u.cli;
412         char *prefix;
413         int rc;
414
415         cli->cl_seq = kzalloc(sizeof(*cli->cl_seq), GFP_NOFS);
416         if (!cli->cl_seq)
417                 return -ENOMEM;
418
419         prefix = kzalloc(MAX_OBD_NAME + 5, GFP_NOFS);
420         if (!prefix) {
421                 rc = -ENOMEM;
422                 goto out_free_seq;
423         }
424
425         snprintf(prefix, MAX_OBD_NAME + 5, "cli-%s", obd->obd_name);
426
427         /* Init client side sequence-manager */
428         rc = seq_client_init(cli->cl_seq, exp, type, prefix);
429         kfree(prefix);
430         if (rc)
431                 goto out_free_seq;
432
433         return rc;
434 out_free_seq:
435         kfree(cli->cl_seq);
436         cli->cl_seq = NULL;
437         return rc;
438 }
439 EXPORT_SYMBOL(client_fid_init);
440
441 int client_fid_fini(struct obd_device *obd)
442 {
443         struct client_obd *cli = &obd->u.cli;
444
445         if (cli->cl_seq) {
446                 seq_client_fini(cli->cl_seq);
447                 kfree(cli->cl_seq);
448                 cli->cl_seq = NULL;
449         }
450
451         return 0;
452 }
453 EXPORT_SYMBOL(client_fid_fini);
454
455 static int __init fid_init(void)
456 {
457         seq_debugfs_dir = ldebugfs_register(LUSTRE_SEQ_NAME,
458                                             debugfs_lustre_root,
459                                             NULL, NULL);
460         return PTR_ERR_OR_ZERO(seq_debugfs_dir);
461 }
462
463 static void __exit fid_exit(void)
464 {
465         if (!IS_ERR_OR_NULL(seq_debugfs_dir))
466                 ldebugfs_remove(&seq_debugfs_dir);
467 }
468
469 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
470 MODULE_DESCRIPTION("Lustre File IDentifier");
471 MODULE_VERSION(LUSTRE_VERSION_STRING);
472 MODULE_LICENSE("GPL");
473
474 module_init(fid_init);
475 module_exit(fid_exit);