Mention branches and keyring.
[releases.git] / dlm / lockspace.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11
12 #include <linux/module.h>
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "midcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27
28 static int                      ls_count;
29 static struct mutex             ls_lock;
30 static struct list_head         lslist;
31 static spinlock_t               lslist_lock;
32 static struct task_struct *     scand_task;
33
34
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37         ssize_t ret = len;
38         int n;
39         int rc = kstrtoint(buf, 0, &n);
40
41         if (rc)
42                 return rc;
43         ls = dlm_find_lockspace_local(ls->ls_local_handle);
44         if (!ls)
45                 return -EINVAL;
46
47         switch (n) {
48         case 0:
49                 dlm_ls_stop(ls);
50                 break;
51         case 1:
52                 dlm_ls_start(ls);
53                 break;
54         default:
55                 ret = -EINVAL;
56         }
57         dlm_put_lockspace(ls);
58         return ret;
59 }
60
61 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
62 {
63         int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
64
65         if (rc)
66                 return rc;
67         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
68         wake_up(&ls->ls_uevent_wait);
69         return len;
70 }
71
72 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
73 {
74         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
75 }
76
77 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
78 {
79         int rc = kstrtouint(buf, 0, &ls->ls_global_id);
80
81         if (rc)
82                 return rc;
83         return len;
84 }
85
86 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
87 {
88         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
89 }
90
91 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
92 {
93         int val;
94         int rc = kstrtoint(buf, 0, &val);
95
96         if (rc)
97                 return rc;
98         if (val == 1)
99                 set_bit(LSFL_NODIR, &ls->ls_flags);
100         return len;
101 }
102
103 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104 {
105         uint32_t status = dlm_recover_status(ls);
106         return snprintf(buf, PAGE_SIZE, "%x\n", status);
107 }
108
109 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110 {
111         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
112 }
113
114 struct dlm_attr {
115         struct attribute attr;
116         ssize_t (*show)(struct dlm_ls *, char *);
117         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118 };
119
120 static struct dlm_attr dlm_attr_control = {
121         .attr  = {.name = "control", .mode = S_IWUSR},
122         .store = dlm_control_store
123 };
124
125 static struct dlm_attr dlm_attr_event = {
126         .attr  = {.name = "event_done", .mode = S_IWUSR},
127         .store = dlm_event_store
128 };
129
130 static struct dlm_attr dlm_attr_id = {
131         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132         .show  = dlm_id_show,
133         .store = dlm_id_store
134 };
135
136 static struct dlm_attr dlm_attr_nodir = {
137         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138         .show  = dlm_nodir_show,
139         .store = dlm_nodir_store
140 };
141
142 static struct dlm_attr dlm_attr_recover_status = {
143         .attr  = {.name = "recover_status", .mode = S_IRUGO},
144         .show  = dlm_recover_status_show
145 };
146
147 static struct dlm_attr dlm_attr_recover_nodeid = {
148         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
149         .show  = dlm_recover_nodeid_show
150 };
151
152 static struct attribute *dlm_attrs[] = {
153         &dlm_attr_control.attr,
154         &dlm_attr_event.attr,
155         &dlm_attr_id.attr,
156         &dlm_attr_nodir.attr,
157         &dlm_attr_recover_status.attr,
158         &dlm_attr_recover_nodeid.attr,
159         NULL,
160 };
161 ATTRIBUTE_GROUPS(dlm);
162
163 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
164                              char *buf)
165 {
166         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
167         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
168         return a->show ? a->show(ls, buf) : 0;
169 }
170
171 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
172                               const char *buf, size_t len)
173 {
174         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
175         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
176         return a->store ? a->store(ls, buf, len) : len;
177 }
178
179 static void lockspace_kobj_release(struct kobject *k)
180 {
181         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
182         kfree(ls);
183 }
184
185 static const struct sysfs_ops dlm_attr_ops = {
186         .show  = dlm_attr_show,
187         .store = dlm_attr_store,
188 };
189
190 static struct kobj_type dlm_ktype = {
191         .default_groups = dlm_groups,
192         .sysfs_ops     = &dlm_attr_ops,
193         .release       = lockspace_kobj_release,
194 };
195
196 static struct kset *dlm_kset;
197
198 static int do_uevent(struct dlm_ls *ls, int in)
199 {
200         if (in)
201                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
202         else
203                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
204
205         log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
206
207         /* dlm_controld will see the uevent, do the necessary group management
208            and then write to sysfs to wake us */
209
210         wait_event(ls->ls_uevent_wait,
211                    test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
212
213         log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
214
215         return ls->ls_uevent_result;
216 }
217
218 static int dlm_uevent(struct kobject *kobj, struct kobj_uevent_env *env)
219 {
220         struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
221
222         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
223         return 0;
224 }
225
226 static const struct kset_uevent_ops dlm_uevent_ops = {
227         .uevent = dlm_uevent,
228 };
229
230 int __init dlm_lockspace_init(void)
231 {
232         ls_count = 0;
233         mutex_init(&ls_lock);
234         INIT_LIST_HEAD(&lslist);
235         spin_lock_init(&lslist_lock);
236
237         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
238         if (!dlm_kset) {
239                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
240                 return -ENOMEM;
241         }
242         return 0;
243 }
244
245 void dlm_lockspace_exit(void)
246 {
247         kset_unregister(dlm_kset);
248 }
249
250 static struct dlm_ls *find_ls_to_scan(void)
251 {
252         struct dlm_ls *ls;
253
254         spin_lock(&lslist_lock);
255         list_for_each_entry(ls, &lslist, ls_list) {
256                 if (time_after_eq(jiffies, ls->ls_scan_time +
257                                             dlm_config.ci_scan_secs * HZ)) {
258                         spin_unlock(&lslist_lock);
259                         return ls;
260                 }
261         }
262         spin_unlock(&lslist_lock);
263         return NULL;
264 }
265
266 static int dlm_scand(void *data)
267 {
268         struct dlm_ls *ls;
269
270         while (!kthread_should_stop()) {
271                 ls = find_ls_to_scan();
272                 if (ls) {
273                         if (dlm_lock_recovery_try(ls)) {
274                                 ls->ls_scan_time = jiffies;
275                                 dlm_scan_rsbs(ls);
276                                 dlm_scan_timeout(ls);
277                                 dlm_unlock_recovery(ls);
278                         } else {
279                                 ls->ls_scan_time += HZ;
280                         }
281                         continue;
282                 }
283                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
284         }
285         return 0;
286 }
287
288 static int dlm_scand_start(void)
289 {
290         struct task_struct *p;
291         int error = 0;
292
293         p = kthread_run(dlm_scand, NULL, "dlm_scand");
294         if (IS_ERR(p))
295                 error = PTR_ERR(p);
296         else
297                 scand_task = p;
298         return error;
299 }
300
301 static void dlm_scand_stop(void)
302 {
303         kthread_stop(scand_task);
304 }
305
306 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
307 {
308         struct dlm_ls *ls;
309
310         spin_lock(&lslist_lock);
311
312         list_for_each_entry(ls, &lslist, ls_list) {
313                 if (ls->ls_global_id == id) {
314                         atomic_inc(&ls->ls_count);
315                         goto out;
316                 }
317         }
318         ls = NULL;
319  out:
320         spin_unlock(&lslist_lock);
321         return ls;
322 }
323
324 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
325 {
326         struct dlm_ls *ls;
327
328         spin_lock(&lslist_lock);
329         list_for_each_entry(ls, &lslist, ls_list) {
330                 if (ls->ls_local_handle == lockspace) {
331                         atomic_inc(&ls->ls_count);
332                         goto out;
333                 }
334         }
335         ls = NULL;
336  out:
337         spin_unlock(&lslist_lock);
338         return ls;
339 }
340
341 struct dlm_ls *dlm_find_lockspace_device(int minor)
342 {
343         struct dlm_ls *ls;
344
345         spin_lock(&lslist_lock);
346         list_for_each_entry(ls, &lslist, ls_list) {
347                 if (ls->ls_device.minor == minor) {
348                         atomic_inc(&ls->ls_count);
349                         goto out;
350                 }
351         }
352         ls = NULL;
353  out:
354         spin_unlock(&lslist_lock);
355         return ls;
356 }
357
358 void dlm_put_lockspace(struct dlm_ls *ls)
359 {
360         if (atomic_dec_and_test(&ls->ls_count))
361                 wake_up(&ls->ls_count_wait);
362 }
363
364 static void remove_lockspace(struct dlm_ls *ls)
365 {
366 retry:
367         wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
368
369         spin_lock(&lslist_lock);
370         if (atomic_read(&ls->ls_count) != 0) {
371                 spin_unlock(&lslist_lock);
372                 goto retry;
373         }
374
375         WARN_ON(ls->ls_create_count != 0);
376         list_del(&ls->ls_list);
377         spin_unlock(&lslist_lock);
378 }
379
380 static int threads_start(void)
381 {
382         int error;
383
384         /* Thread for sending/receiving messages for all lockspace's */
385         error = dlm_midcomms_start();
386         if (error) {
387                 log_print("cannot start dlm midcomms %d", error);
388                 goto fail;
389         }
390
391         error = dlm_scand_start();
392         if (error) {
393                 log_print("cannot start dlm_scand thread %d", error);
394                 goto midcomms_fail;
395         }
396
397         return 0;
398
399  midcomms_fail:
400         dlm_midcomms_stop();
401  fail:
402         return error;
403 }
404
405 static int new_lockspace(const char *name, const char *cluster,
406                          uint32_t flags, int lvblen,
407                          const struct dlm_lockspace_ops *ops, void *ops_arg,
408                          int *ops_result, dlm_lockspace_t **lockspace)
409 {
410         struct dlm_ls *ls;
411         int i, size, error;
412         int do_unreg = 0;
413         int namelen = strlen(name);
414
415         if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
416                 return -EINVAL;
417
418         if (lvblen % 8)
419                 return -EINVAL;
420
421         if (!try_module_get(THIS_MODULE))
422                 return -EINVAL;
423
424         if (!dlm_user_daemon_available()) {
425                 log_print("dlm user daemon not available");
426                 error = -EUNATCH;
427                 goto out;
428         }
429
430         if (ops && ops_result) {
431                 if (!dlm_config.ci_recover_callbacks)
432                         *ops_result = -EOPNOTSUPP;
433                 else
434                         *ops_result = 0;
435         }
436
437         if (!cluster)
438                 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
439                           dlm_config.ci_cluster_name);
440
441         if (dlm_config.ci_recover_callbacks && cluster &&
442             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
443                 log_print("dlm cluster name '%s' does not match "
444                           "the application cluster name '%s'",
445                           dlm_config.ci_cluster_name, cluster);
446                 error = -EBADR;
447                 goto out;
448         }
449
450         error = 0;
451
452         spin_lock(&lslist_lock);
453         list_for_each_entry(ls, &lslist, ls_list) {
454                 WARN_ON(ls->ls_create_count <= 0);
455                 if (ls->ls_namelen != namelen)
456                         continue;
457                 if (memcmp(ls->ls_name, name, namelen))
458                         continue;
459                 if (flags & DLM_LSFL_NEWEXCL) {
460                         error = -EEXIST;
461                         break;
462                 }
463                 ls->ls_create_count++;
464                 *lockspace = ls;
465                 error = 1;
466                 break;
467         }
468         spin_unlock(&lslist_lock);
469
470         if (error)
471                 goto out;
472
473         error = -ENOMEM;
474
475         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
476         if (!ls)
477                 goto out;
478         memcpy(ls->ls_name, name, namelen);
479         ls->ls_namelen = namelen;
480         ls->ls_lvblen = lvblen;
481         atomic_set(&ls->ls_count, 0);
482         init_waitqueue_head(&ls->ls_count_wait);
483         ls->ls_flags = 0;
484         ls->ls_scan_time = jiffies;
485
486         if (ops && dlm_config.ci_recover_callbacks) {
487                 ls->ls_ops = ops;
488                 ls->ls_ops_arg = ops_arg;
489         }
490
491 #ifdef CONFIG_DLM_DEPRECATED_API
492         if (flags & DLM_LSFL_TIMEWARN) {
493                 pr_warn_once("===============================================================\n"
494                              "WARNING: the dlm DLM_LSFL_TIMEWARN flag is being deprecated and\n"
495                              "         will be removed in v6.2!\n"
496                              "         Inclusive DLM_LSFL_TIMEWARN define in UAPI header!\n"
497                              "===============================================================\n");
498
499                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
500         }
501
502         /* ls_exflags are forced to match among nodes, and we don't
503          * need to require all nodes to have some flags set
504          */
505         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
506                                     DLM_LSFL_NEWEXCL));
507 #else
508         /* ls_exflags are forced to match among nodes, and we don't
509          * need to require all nodes to have some flags set
510          */
511         ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
512 #endif
513
514         size = READ_ONCE(dlm_config.ci_rsbtbl_size);
515         ls->ls_rsbtbl_size = size;
516
517         ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
518         if (!ls->ls_rsbtbl)
519                 goto out_lsfree;
520         for (i = 0; i < size; i++) {
521                 ls->ls_rsbtbl[i].keep.rb_node = NULL;
522                 ls->ls_rsbtbl[i].toss.rb_node = NULL;
523                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
524         }
525
526         spin_lock_init(&ls->ls_remove_spin);
527         init_waitqueue_head(&ls->ls_remove_wait);
528
529         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
530                 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
531                                                  GFP_KERNEL);
532                 if (!ls->ls_remove_names[i])
533                         goto out_rsbtbl;
534         }
535
536         idr_init(&ls->ls_lkbidr);
537         spin_lock_init(&ls->ls_lkbidr_spin);
538
539         INIT_LIST_HEAD(&ls->ls_waiters);
540         mutex_init(&ls->ls_waiters_mutex);
541         INIT_LIST_HEAD(&ls->ls_orphans);
542         mutex_init(&ls->ls_orphans_mutex);
543 #ifdef CONFIG_DLM_DEPRECATED_API
544         INIT_LIST_HEAD(&ls->ls_timeout);
545         mutex_init(&ls->ls_timeout_mutex);
546 #endif
547
548         INIT_LIST_HEAD(&ls->ls_new_rsb);
549         spin_lock_init(&ls->ls_new_rsb_spin);
550
551         INIT_LIST_HEAD(&ls->ls_nodes);
552         INIT_LIST_HEAD(&ls->ls_nodes_gone);
553         ls->ls_num_nodes = 0;
554         ls->ls_low_nodeid = 0;
555         ls->ls_total_weight = 0;
556         ls->ls_node_array = NULL;
557
558         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
559         ls->ls_stub_rsb.res_ls = ls;
560
561         ls->ls_debug_rsb_dentry = NULL;
562         ls->ls_debug_waiters_dentry = NULL;
563
564         init_waitqueue_head(&ls->ls_uevent_wait);
565         ls->ls_uevent_result = 0;
566         init_completion(&ls->ls_recovery_done);
567         ls->ls_recovery_result = -1;
568
569         mutex_init(&ls->ls_cb_mutex);
570         INIT_LIST_HEAD(&ls->ls_cb_delay);
571
572         ls->ls_recoverd_task = NULL;
573         mutex_init(&ls->ls_recoverd_active);
574         spin_lock_init(&ls->ls_recover_lock);
575         spin_lock_init(&ls->ls_rcom_spin);
576         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
577         ls->ls_recover_status = 0;
578         ls->ls_recover_seq = 0;
579         ls->ls_recover_args = NULL;
580         init_rwsem(&ls->ls_in_recovery);
581         init_rwsem(&ls->ls_recv_active);
582         INIT_LIST_HEAD(&ls->ls_requestqueue);
583         atomic_set(&ls->ls_requestqueue_cnt, 0);
584         init_waitqueue_head(&ls->ls_requestqueue_wait);
585         mutex_init(&ls->ls_requestqueue_mutex);
586         spin_lock_init(&ls->ls_clear_proc_locks);
587
588         /* Due backwards compatibility with 3.1 we need to use maximum
589          * possible dlm message size to be sure the message will fit and
590          * not having out of bounds issues. However on sending side 3.2
591          * might send less.
592          */
593         ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
594         if (!ls->ls_recover_buf)
595                 goto out_lkbidr;
596
597         ls->ls_slot = 0;
598         ls->ls_num_slots = 0;
599         ls->ls_slots_size = 0;
600         ls->ls_slots = NULL;
601
602         INIT_LIST_HEAD(&ls->ls_recover_list);
603         spin_lock_init(&ls->ls_recover_list_lock);
604         idr_init(&ls->ls_recover_idr);
605         spin_lock_init(&ls->ls_recover_idr_lock);
606         ls->ls_recover_list_count = 0;
607         ls->ls_local_handle = ls;
608         init_waitqueue_head(&ls->ls_wait_general);
609         INIT_LIST_HEAD(&ls->ls_root_list);
610         init_rwsem(&ls->ls_root_sem);
611
612         spin_lock(&lslist_lock);
613         ls->ls_create_count = 1;
614         list_add(&ls->ls_list, &lslist);
615         spin_unlock(&lslist_lock);
616
617         if (flags & DLM_LSFL_FS) {
618                 error = dlm_callback_start(ls);
619                 if (error) {
620                         log_error(ls, "can't start dlm_callback %d", error);
621                         goto out_delist;
622                 }
623         }
624
625         init_waitqueue_head(&ls->ls_recover_lock_wait);
626
627         /*
628          * Once started, dlm_recoverd first looks for ls in lslist, then
629          * initializes ls_in_recovery as locked in "down" mode.  We need
630          * to wait for the wakeup from dlm_recoverd because in_recovery
631          * has to start out in down mode.
632          */
633
634         error = dlm_recoverd_start(ls);
635         if (error) {
636                 log_error(ls, "can't start dlm_recoverd %d", error);
637                 goto out_callback;
638         }
639
640         wait_event(ls->ls_recover_lock_wait,
641                    test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
642
643         /* let kobject handle freeing of ls if there's an error */
644         do_unreg = 1;
645
646         ls->ls_kobj.kset = dlm_kset;
647         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
648                                      "%s", ls->ls_name);
649         if (error)
650                 goto out_recoverd;
651         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
652
653         /* This uevent triggers dlm_controld in userspace to add us to the
654            group of nodes that are members of this lockspace (managed by the
655            cluster infrastructure.)  Once it's done that, it tells us who the
656            current lockspace members are (via configfs) and then tells the
657            lockspace to start running (via sysfs) in dlm_ls_start(). */
658
659         error = do_uevent(ls, 1);
660         if (error)
661                 goto out_recoverd;
662
663         /* wait until recovery is successful or failed */
664         wait_for_completion(&ls->ls_recovery_done);
665         error = ls->ls_recovery_result;
666         if (error)
667                 goto out_members;
668
669         dlm_create_debug_file(ls);
670
671         log_rinfo(ls, "join complete");
672         *lockspace = ls;
673         return 0;
674
675  out_members:
676         do_uevent(ls, 0);
677         dlm_clear_members(ls);
678         kfree(ls->ls_node_array);
679  out_recoverd:
680         dlm_recoverd_stop(ls);
681  out_callback:
682         dlm_callback_stop(ls);
683  out_delist:
684         spin_lock(&lslist_lock);
685         list_del(&ls->ls_list);
686         spin_unlock(&lslist_lock);
687         idr_destroy(&ls->ls_recover_idr);
688         kfree(ls->ls_recover_buf);
689  out_lkbidr:
690         idr_destroy(&ls->ls_lkbidr);
691  out_rsbtbl:
692         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
693                 kfree(ls->ls_remove_names[i]);
694         vfree(ls->ls_rsbtbl);
695  out_lsfree:
696         if (do_unreg)
697                 kobject_put(&ls->ls_kobj);
698         else
699                 kfree(ls);
700  out:
701         module_put(THIS_MODULE);
702         return error;
703 }
704
705 static int __dlm_new_lockspace(const char *name, const char *cluster,
706                                uint32_t flags, int lvblen,
707                                const struct dlm_lockspace_ops *ops,
708                                void *ops_arg, int *ops_result,
709                                dlm_lockspace_t **lockspace)
710 {
711         int error = 0;
712
713         mutex_lock(&ls_lock);
714         if (!ls_count)
715                 error = threads_start();
716         if (error)
717                 goto out;
718
719         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
720                               ops_result, lockspace);
721         if (!error)
722                 ls_count++;
723         if (error > 0)
724                 error = 0;
725         if (!ls_count) {
726                 dlm_scand_stop();
727                 dlm_midcomms_shutdown();
728                 dlm_midcomms_stop();
729         }
730  out:
731         mutex_unlock(&ls_lock);
732         return error;
733 }
734
735 int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
736                       int lvblen, const struct dlm_lockspace_ops *ops,
737                       void *ops_arg, int *ops_result,
738                       dlm_lockspace_t **lockspace)
739 {
740         return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
741                                    ops, ops_arg, ops_result, lockspace);
742 }
743
744 int dlm_new_user_lockspace(const char *name, const char *cluster,
745                            uint32_t flags, int lvblen,
746                            const struct dlm_lockspace_ops *ops,
747                            void *ops_arg, int *ops_result,
748                            dlm_lockspace_t **lockspace)
749 {
750         return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
751                                    ops_arg, ops_result, lockspace);
752 }
753
754 static int lkb_idr_is_local(int id, void *p, void *data)
755 {
756         struct dlm_lkb *lkb = p;
757
758         return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
759 }
760
761 static int lkb_idr_is_any(int id, void *p, void *data)
762 {
763         return 1;
764 }
765
766 static int lkb_idr_free(int id, void *p, void *data)
767 {
768         struct dlm_lkb *lkb = p;
769
770         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
771                 dlm_free_lvb(lkb->lkb_lvbptr);
772
773         dlm_free_lkb(lkb);
774         return 0;
775 }
776
777 /* NOTE: We check the lkbidr here rather than the resource table.
778    This is because there may be LKBs queued as ASTs that have been unlinked
779    from their RSBs and are pending deletion once the AST has been delivered */
780
781 static int lockspace_busy(struct dlm_ls *ls, int force)
782 {
783         int rv;
784
785         spin_lock(&ls->ls_lkbidr_spin);
786         if (force == 0) {
787                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
788         } else if (force == 1) {
789                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
790         } else {
791                 rv = 0;
792         }
793         spin_unlock(&ls->ls_lkbidr_spin);
794         return rv;
795 }
796
797 static int release_lockspace(struct dlm_ls *ls, int force)
798 {
799         struct dlm_rsb *rsb;
800         struct rb_node *n;
801         int i, busy, rv;
802
803         busy = lockspace_busy(ls, force);
804
805         spin_lock(&lslist_lock);
806         if (ls->ls_create_count == 1) {
807                 if (busy) {
808                         rv = -EBUSY;
809                 } else {
810                         /* remove_lockspace takes ls off lslist */
811                         ls->ls_create_count = 0;
812                         rv = 0;
813                 }
814         } else if (ls->ls_create_count > 1) {
815                 rv = --ls->ls_create_count;
816         } else {
817                 rv = -EINVAL;
818         }
819         spin_unlock(&lslist_lock);
820
821         if (rv) {
822                 log_debug(ls, "release_lockspace no remove %d", rv);
823                 return rv;
824         }
825
826         dlm_device_deregister(ls);
827
828         if (force < 3 && dlm_user_daemon_available())
829                 do_uevent(ls, 0);
830
831         dlm_recoverd_stop(ls);
832
833         if (ls_count == 1) {
834                 dlm_scand_stop();
835                 dlm_clear_members(ls);
836                 dlm_midcomms_shutdown();
837         }
838
839         dlm_callback_stop(ls);
840
841         remove_lockspace(ls);
842
843         dlm_delete_debug_file(ls);
844
845         idr_destroy(&ls->ls_recover_idr);
846         kfree(ls->ls_recover_buf);
847
848         /*
849          * Free all lkb's in idr
850          */
851
852         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
853         idr_destroy(&ls->ls_lkbidr);
854
855         /*
856          * Free all rsb's on rsbtbl[] lists
857          */
858
859         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
860                 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
861                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
862                         rb_erase(n, &ls->ls_rsbtbl[i].keep);
863                         dlm_free_rsb(rsb);
864                 }
865
866                 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
867                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
868                         rb_erase(n, &ls->ls_rsbtbl[i].toss);
869                         dlm_free_rsb(rsb);
870                 }
871         }
872
873         vfree(ls->ls_rsbtbl);
874
875         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
876                 kfree(ls->ls_remove_names[i]);
877
878         while (!list_empty(&ls->ls_new_rsb)) {
879                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
880                                        res_hashchain);
881                 list_del(&rsb->res_hashchain);
882                 dlm_free_rsb(rsb);
883         }
884
885         /*
886          * Free structures on any other lists
887          */
888
889         dlm_purge_requestqueue(ls);
890         kfree(ls->ls_recover_args);
891         dlm_clear_members(ls);
892         dlm_clear_members_gone(ls);
893         kfree(ls->ls_node_array);
894         log_rinfo(ls, "release_lockspace final free");
895         kobject_put(&ls->ls_kobj);
896         /* The ls structure will be freed when the kobject is done with */
897
898         module_put(THIS_MODULE);
899         return 0;
900 }
901
902 /*
903  * Called when a system has released all its locks and is not going to use the
904  * lockspace any longer.  We free everything we're managing for this lockspace.
905  * Remaining nodes will go through the recovery process as if we'd died.  The
906  * lockspace must continue to function as usual, participating in recoveries,
907  * until this returns.
908  *
909  * Force has 4 possible values:
910  * 0 - don't destroy lockspace if it has any LKBs
911  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
912  * 2 - destroy lockspace regardless of LKBs
913  * 3 - destroy lockspace as part of a forced shutdown
914  */
915
916 int dlm_release_lockspace(void *lockspace, int force)
917 {
918         struct dlm_ls *ls;
919         int error;
920
921         ls = dlm_find_lockspace_local(lockspace);
922         if (!ls)
923                 return -EINVAL;
924         dlm_put_lockspace(ls);
925
926         mutex_lock(&ls_lock);
927         error = release_lockspace(ls, force);
928         if (!error)
929                 ls_count--;
930         if (!ls_count)
931                 dlm_midcomms_stop();
932         mutex_unlock(&ls_lock);
933
934         return error;
935 }
936
937 void dlm_stop_lockspaces(void)
938 {
939         struct dlm_ls *ls;
940         int count;
941
942  restart:
943         count = 0;
944         spin_lock(&lslist_lock);
945         list_for_each_entry(ls, &lslist, ls_list) {
946                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
947                         count++;
948                         continue;
949                 }
950                 spin_unlock(&lslist_lock);
951                 log_error(ls, "no userland control daemon, stopping lockspace");
952                 dlm_ls_stop(ls);
953                 goto restart;
954         }
955         spin_unlock(&lslist_lock);
956
957         if (count)
958                 log_print("dlm user daemon left %d lockspaces", count);
959 }
960