arm64: dts: qcom: sm8550: add TRNG node
[linux-modified.git] / fs / dlm / lockspace.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
7 **
8 **
9 *******************************************************************************
10 ******************************************************************************/
11
12 #include <linux/module.h>
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "midcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27
28 static int                      ls_count;
29 static struct mutex             ls_lock;
30 static struct list_head         lslist;
31 static spinlock_t               lslist_lock;
32 static struct task_struct *     scand_task;
33
34
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37         ssize_t ret = len;
38         int n;
39         int rc = kstrtoint(buf, 0, &n);
40
41         if (rc)
42                 return rc;
43         ls = dlm_find_lockspace_local(ls->ls_local_handle);
44         if (!ls)
45                 return -EINVAL;
46
47         switch (n) {
48         case 0:
49                 dlm_ls_stop(ls);
50                 break;
51         case 1:
52                 dlm_ls_start(ls);
53                 break;
54         default:
55                 ret = -EINVAL;
56         }
57         dlm_put_lockspace(ls);
58         return ret;
59 }
60
61 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
62 {
63         int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
64
65         if (rc)
66                 return rc;
67         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
68         wake_up(&ls->ls_uevent_wait);
69         return len;
70 }
71
72 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
73 {
74         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
75 }
76
77 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
78 {
79         int rc = kstrtouint(buf, 0, &ls->ls_global_id);
80
81         if (rc)
82                 return rc;
83         return len;
84 }
85
86 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
87 {
88         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
89 }
90
91 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
92 {
93         int val;
94         int rc = kstrtoint(buf, 0, &val);
95
96         if (rc)
97                 return rc;
98         if (val == 1)
99                 set_bit(LSFL_NODIR, &ls->ls_flags);
100         return len;
101 }
102
103 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104 {
105         uint32_t status = dlm_recover_status(ls);
106         return snprintf(buf, PAGE_SIZE, "%x\n", status);
107 }
108
109 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110 {
111         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
112 }
113
114 struct dlm_attr {
115         struct attribute attr;
116         ssize_t (*show)(struct dlm_ls *, char *);
117         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118 };
119
120 static struct dlm_attr dlm_attr_control = {
121         .attr  = {.name = "control", .mode = S_IWUSR},
122         .store = dlm_control_store
123 };
124
125 static struct dlm_attr dlm_attr_event = {
126         .attr  = {.name = "event_done", .mode = S_IWUSR},
127         .store = dlm_event_store
128 };
129
130 static struct dlm_attr dlm_attr_id = {
131         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132         .show  = dlm_id_show,
133         .store = dlm_id_store
134 };
135
136 static struct dlm_attr dlm_attr_nodir = {
137         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138         .show  = dlm_nodir_show,
139         .store = dlm_nodir_store
140 };
141
142 static struct dlm_attr dlm_attr_recover_status = {
143         .attr  = {.name = "recover_status", .mode = S_IRUGO},
144         .show  = dlm_recover_status_show
145 };
146
147 static struct dlm_attr dlm_attr_recover_nodeid = {
148         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
149         .show  = dlm_recover_nodeid_show
150 };
151
152 static struct attribute *dlm_attrs[] = {
153         &dlm_attr_control.attr,
154         &dlm_attr_event.attr,
155         &dlm_attr_id.attr,
156         &dlm_attr_nodir.attr,
157         &dlm_attr_recover_status.attr,
158         &dlm_attr_recover_nodeid.attr,
159         NULL,
160 };
161 ATTRIBUTE_GROUPS(dlm);
162
163 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
164                              char *buf)
165 {
166         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
167         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
168         return a->show ? a->show(ls, buf) : 0;
169 }
170
171 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
172                               const char *buf, size_t len)
173 {
174         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
175         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
176         return a->store ? a->store(ls, buf, len) : len;
177 }
178
179 static void lockspace_kobj_release(struct kobject *k)
180 {
181         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
182         kfree(ls);
183 }
184
185 static const struct sysfs_ops dlm_attr_ops = {
186         .show  = dlm_attr_show,
187         .store = dlm_attr_store,
188 };
189
190 static struct kobj_type dlm_ktype = {
191         .default_groups = dlm_groups,
192         .sysfs_ops     = &dlm_attr_ops,
193         .release       = lockspace_kobj_release,
194 };
195
196 static struct kset *dlm_kset;
197
198 static int do_uevent(struct dlm_ls *ls, int in)
199 {
200         if (in)
201                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
202         else
203                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
204
205         log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
206
207         /* dlm_controld will see the uevent, do the necessary group management
208            and then write to sysfs to wake us */
209
210         wait_event(ls->ls_uevent_wait,
211                    test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
212
213         log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
214
215         return ls->ls_uevent_result;
216 }
217
218 static int dlm_uevent(const struct kobject *kobj, struct kobj_uevent_env *env)
219 {
220         const struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
221
222         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
223         return 0;
224 }
225
226 static const struct kset_uevent_ops dlm_uevent_ops = {
227         .uevent = dlm_uevent,
228 };
229
230 int __init dlm_lockspace_init(void)
231 {
232         ls_count = 0;
233         mutex_init(&ls_lock);
234         INIT_LIST_HEAD(&lslist);
235         spin_lock_init(&lslist_lock);
236
237         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
238         if (!dlm_kset) {
239                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
240                 return -ENOMEM;
241         }
242         return 0;
243 }
244
245 void dlm_lockspace_exit(void)
246 {
247         kset_unregister(dlm_kset);
248 }
249
250 static struct dlm_ls *find_ls_to_scan(void)
251 {
252         struct dlm_ls *ls;
253
254         spin_lock(&lslist_lock);
255         list_for_each_entry(ls, &lslist, ls_list) {
256                 if (time_after_eq(jiffies, ls->ls_scan_time +
257                                             dlm_config.ci_scan_secs * HZ)) {
258                         spin_unlock(&lslist_lock);
259                         return ls;
260                 }
261         }
262         spin_unlock(&lslist_lock);
263         return NULL;
264 }
265
266 static int dlm_scand(void *data)
267 {
268         struct dlm_ls *ls;
269
270         while (!kthread_should_stop()) {
271                 ls = find_ls_to_scan();
272                 if (ls) {
273                         if (dlm_lock_recovery_try(ls)) {
274                                 ls->ls_scan_time = jiffies;
275                                 dlm_scan_rsbs(ls);
276                                 dlm_unlock_recovery(ls);
277                         } else {
278                                 ls->ls_scan_time += HZ;
279                         }
280                         continue;
281                 }
282                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
283         }
284         return 0;
285 }
286
287 static int dlm_scand_start(void)
288 {
289         struct task_struct *p;
290         int error = 0;
291
292         p = kthread_run(dlm_scand, NULL, "dlm_scand");
293         if (IS_ERR(p))
294                 error = PTR_ERR(p);
295         else
296                 scand_task = p;
297         return error;
298 }
299
300 static void dlm_scand_stop(void)
301 {
302         kthread_stop(scand_task);
303 }
304
305 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
306 {
307         struct dlm_ls *ls;
308
309         spin_lock(&lslist_lock);
310
311         list_for_each_entry(ls, &lslist, ls_list) {
312                 if (ls->ls_global_id == id) {
313                         atomic_inc(&ls->ls_count);
314                         goto out;
315                 }
316         }
317         ls = NULL;
318  out:
319         spin_unlock(&lslist_lock);
320         return ls;
321 }
322
323 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
324 {
325         struct dlm_ls *ls;
326
327         spin_lock(&lslist_lock);
328         list_for_each_entry(ls, &lslist, ls_list) {
329                 if (ls->ls_local_handle == lockspace) {
330                         atomic_inc(&ls->ls_count);
331                         goto out;
332                 }
333         }
334         ls = NULL;
335  out:
336         spin_unlock(&lslist_lock);
337         return ls;
338 }
339
340 struct dlm_ls *dlm_find_lockspace_device(int minor)
341 {
342         struct dlm_ls *ls;
343
344         spin_lock(&lslist_lock);
345         list_for_each_entry(ls, &lslist, ls_list) {
346                 if (ls->ls_device.minor == minor) {
347                         atomic_inc(&ls->ls_count);
348                         goto out;
349                 }
350         }
351         ls = NULL;
352  out:
353         spin_unlock(&lslist_lock);
354         return ls;
355 }
356
357 void dlm_put_lockspace(struct dlm_ls *ls)
358 {
359         if (atomic_dec_and_test(&ls->ls_count))
360                 wake_up(&ls->ls_count_wait);
361 }
362
363 static void remove_lockspace(struct dlm_ls *ls)
364 {
365 retry:
366         wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0);
367
368         spin_lock(&lslist_lock);
369         if (atomic_read(&ls->ls_count) != 0) {
370                 spin_unlock(&lslist_lock);
371                 goto retry;
372         }
373
374         WARN_ON(ls->ls_create_count != 0);
375         list_del(&ls->ls_list);
376         spin_unlock(&lslist_lock);
377 }
378
379 static int threads_start(void)
380 {
381         int error;
382
383         /* Thread for sending/receiving messages for all lockspace's */
384         error = dlm_midcomms_start();
385         if (error) {
386                 log_print("cannot start dlm midcomms %d", error);
387                 goto fail;
388         }
389
390         error = dlm_scand_start();
391         if (error) {
392                 log_print("cannot start dlm_scand thread %d", error);
393                 goto midcomms_fail;
394         }
395
396         return 0;
397
398  midcomms_fail:
399         dlm_midcomms_stop();
400  fail:
401         return error;
402 }
403
404 static int new_lockspace(const char *name, const char *cluster,
405                          uint32_t flags, int lvblen,
406                          const struct dlm_lockspace_ops *ops, void *ops_arg,
407                          int *ops_result, dlm_lockspace_t **lockspace)
408 {
409         struct dlm_ls *ls;
410         int i, size, error;
411         int do_unreg = 0;
412         int namelen = strlen(name);
413
414         if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
415                 return -EINVAL;
416
417         if (lvblen % 8)
418                 return -EINVAL;
419
420         if (!try_module_get(THIS_MODULE))
421                 return -EINVAL;
422
423         if (!dlm_user_daemon_available()) {
424                 log_print("dlm user daemon not available");
425                 error = -EUNATCH;
426                 goto out;
427         }
428
429         if (ops && ops_result) {
430                 if (!dlm_config.ci_recover_callbacks)
431                         *ops_result = -EOPNOTSUPP;
432                 else
433                         *ops_result = 0;
434         }
435
436         if (!cluster)
437                 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
438                           dlm_config.ci_cluster_name);
439
440         if (dlm_config.ci_recover_callbacks && cluster &&
441             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
442                 log_print("dlm cluster name '%s' does not match "
443                           "the application cluster name '%s'",
444                           dlm_config.ci_cluster_name, cluster);
445                 error = -EBADR;
446                 goto out;
447         }
448
449         error = 0;
450
451         spin_lock(&lslist_lock);
452         list_for_each_entry(ls, &lslist, ls_list) {
453                 WARN_ON(ls->ls_create_count <= 0);
454                 if (ls->ls_namelen != namelen)
455                         continue;
456                 if (memcmp(ls->ls_name, name, namelen))
457                         continue;
458                 if (flags & DLM_LSFL_NEWEXCL) {
459                         error = -EEXIST;
460                         break;
461                 }
462                 ls->ls_create_count++;
463                 *lockspace = ls;
464                 error = 1;
465                 break;
466         }
467         spin_unlock(&lslist_lock);
468
469         if (error)
470                 goto out;
471
472         error = -ENOMEM;
473
474         ls = kzalloc(sizeof(*ls), GFP_NOFS);
475         if (!ls)
476                 goto out;
477         memcpy(ls->ls_name, name, namelen);
478         ls->ls_namelen = namelen;
479         ls->ls_lvblen = lvblen;
480         atomic_set(&ls->ls_count, 0);
481         init_waitqueue_head(&ls->ls_count_wait);
482         ls->ls_flags = 0;
483         ls->ls_scan_time = jiffies;
484
485         if (ops && dlm_config.ci_recover_callbacks) {
486                 ls->ls_ops = ops;
487                 ls->ls_ops_arg = ops_arg;
488         }
489
490         /* ls_exflags are forced to match among nodes, and we don't
491          * need to require all nodes to have some flags set
492          */
493         ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL));
494
495         size = READ_ONCE(dlm_config.ci_rsbtbl_size);
496         ls->ls_rsbtbl_size = size;
497
498         ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
499         if (!ls->ls_rsbtbl)
500                 goto out_lsfree;
501         for (i = 0; i < size; i++) {
502                 ls->ls_rsbtbl[i].keep.rb_node = NULL;
503                 ls->ls_rsbtbl[i].toss.rb_node = NULL;
504                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
505         }
506
507         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
508                 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
509                                                  GFP_KERNEL);
510                 if (!ls->ls_remove_names[i])
511                         goto out_rsbtbl;
512         }
513
514         idr_init(&ls->ls_lkbidr);
515         spin_lock_init(&ls->ls_lkbidr_spin);
516
517         INIT_LIST_HEAD(&ls->ls_waiters);
518         mutex_init(&ls->ls_waiters_mutex);
519         INIT_LIST_HEAD(&ls->ls_orphans);
520         mutex_init(&ls->ls_orphans_mutex);
521
522         INIT_LIST_HEAD(&ls->ls_new_rsb);
523         spin_lock_init(&ls->ls_new_rsb_spin);
524
525         INIT_LIST_HEAD(&ls->ls_nodes);
526         INIT_LIST_HEAD(&ls->ls_nodes_gone);
527         ls->ls_num_nodes = 0;
528         ls->ls_low_nodeid = 0;
529         ls->ls_total_weight = 0;
530         ls->ls_node_array = NULL;
531
532         memset(&ls->ls_local_rsb, 0, sizeof(struct dlm_rsb));
533         ls->ls_local_rsb.res_ls = ls;
534
535         ls->ls_debug_rsb_dentry = NULL;
536         ls->ls_debug_waiters_dentry = NULL;
537
538         init_waitqueue_head(&ls->ls_uevent_wait);
539         ls->ls_uevent_result = 0;
540         init_completion(&ls->ls_recovery_done);
541         ls->ls_recovery_result = -1;
542
543         spin_lock_init(&ls->ls_cb_lock);
544         INIT_LIST_HEAD(&ls->ls_cb_delay);
545
546         ls->ls_recoverd_task = NULL;
547         mutex_init(&ls->ls_recoverd_active);
548         spin_lock_init(&ls->ls_recover_lock);
549         spin_lock_init(&ls->ls_rcom_spin);
550         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
551         ls->ls_recover_status = 0;
552         ls->ls_recover_seq = get_random_u64();
553         ls->ls_recover_args = NULL;
554         init_rwsem(&ls->ls_in_recovery);
555         init_rwsem(&ls->ls_recv_active);
556         INIT_LIST_HEAD(&ls->ls_requestqueue);
557         atomic_set(&ls->ls_requestqueue_cnt, 0);
558         init_waitqueue_head(&ls->ls_requestqueue_wait);
559         mutex_init(&ls->ls_requestqueue_mutex);
560         spin_lock_init(&ls->ls_clear_proc_locks);
561
562         /* Due backwards compatibility with 3.1 we need to use maximum
563          * possible dlm message size to be sure the message will fit and
564          * not having out of bounds issues. However on sending side 3.2
565          * might send less.
566          */
567         ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS);
568         if (!ls->ls_recover_buf)
569                 goto out_lkbidr;
570
571         ls->ls_slot = 0;
572         ls->ls_num_slots = 0;
573         ls->ls_slots_size = 0;
574         ls->ls_slots = NULL;
575
576         INIT_LIST_HEAD(&ls->ls_recover_list);
577         spin_lock_init(&ls->ls_recover_list_lock);
578         idr_init(&ls->ls_recover_idr);
579         spin_lock_init(&ls->ls_recover_idr_lock);
580         ls->ls_recover_list_count = 0;
581         ls->ls_local_handle = ls;
582         init_waitqueue_head(&ls->ls_wait_general);
583         INIT_LIST_HEAD(&ls->ls_root_list);
584         init_rwsem(&ls->ls_root_sem);
585
586         spin_lock(&lslist_lock);
587         ls->ls_create_count = 1;
588         list_add(&ls->ls_list, &lslist);
589         spin_unlock(&lslist_lock);
590
591         if (flags & DLM_LSFL_FS) {
592                 error = dlm_callback_start(ls);
593                 if (error) {
594                         log_error(ls, "can't start dlm_callback %d", error);
595                         goto out_delist;
596                 }
597         }
598
599         init_waitqueue_head(&ls->ls_recover_lock_wait);
600
601         /*
602          * Once started, dlm_recoverd first looks for ls in lslist, then
603          * initializes ls_in_recovery as locked in "down" mode.  We need
604          * to wait for the wakeup from dlm_recoverd because in_recovery
605          * has to start out in down mode.
606          */
607
608         error = dlm_recoverd_start(ls);
609         if (error) {
610                 log_error(ls, "can't start dlm_recoverd %d", error);
611                 goto out_callback;
612         }
613
614         wait_event(ls->ls_recover_lock_wait,
615                    test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
616
617         /* let kobject handle freeing of ls if there's an error */
618         do_unreg = 1;
619
620         ls->ls_kobj.kset = dlm_kset;
621         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
622                                      "%s", ls->ls_name);
623         if (error)
624                 goto out_recoverd;
625         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
626
627         /* This uevent triggers dlm_controld in userspace to add us to the
628            group of nodes that are members of this lockspace (managed by the
629            cluster infrastructure.)  Once it's done that, it tells us who the
630            current lockspace members are (via configfs) and then tells the
631            lockspace to start running (via sysfs) in dlm_ls_start(). */
632
633         error = do_uevent(ls, 1);
634         if (error)
635                 goto out_recoverd;
636
637         /* wait until recovery is successful or failed */
638         wait_for_completion(&ls->ls_recovery_done);
639         error = ls->ls_recovery_result;
640         if (error)
641                 goto out_members;
642
643         dlm_create_debug_file(ls);
644
645         log_rinfo(ls, "join complete");
646         *lockspace = ls;
647         return 0;
648
649  out_members:
650         do_uevent(ls, 0);
651         dlm_clear_members(ls);
652         kfree(ls->ls_node_array);
653  out_recoverd:
654         dlm_recoverd_stop(ls);
655  out_callback:
656         dlm_callback_stop(ls);
657  out_delist:
658         spin_lock(&lslist_lock);
659         list_del(&ls->ls_list);
660         spin_unlock(&lslist_lock);
661         idr_destroy(&ls->ls_recover_idr);
662         kfree(ls->ls_recover_buf);
663  out_lkbidr:
664         idr_destroy(&ls->ls_lkbidr);
665  out_rsbtbl:
666         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
667                 kfree(ls->ls_remove_names[i]);
668         vfree(ls->ls_rsbtbl);
669  out_lsfree:
670         if (do_unreg)
671                 kobject_put(&ls->ls_kobj);
672         else
673                 kfree(ls);
674  out:
675         module_put(THIS_MODULE);
676         return error;
677 }
678
679 static int __dlm_new_lockspace(const char *name, const char *cluster,
680                                uint32_t flags, int lvblen,
681                                const struct dlm_lockspace_ops *ops,
682                                void *ops_arg, int *ops_result,
683                                dlm_lockspace_t **lockspace)
684 {
685         int error = 0;
686
687         mutex_lock(&ls_lock);
688         if (!ls_count)
689                 error = threads_start();
690         if (error)
691                 goto out;
692
693         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
694                               ops_result, lockspace);
695         if (!error)
696                 ls_count++;
697         if (error > 0)
698                 error = 0;
699         if (!ls_count) {
700                 dlm_scand_stop();
701                 dlm_midcomms_shutdown();
702                 dlm_midcomms_stop();
703         }
704  out:
705         mutex_unlock(&ls_lock);
706         return error;
707 }
708
709 int dlm_new_lockspace(const char *name, const char *cluster, uint32_t flags,
710                       int lvblen, const struct dlm_lockspace_ops *ops,
711                       void *ops_arg, int *ops_result,
712                       dlm_lockspace_t **lockspace)
713 {
714         return __dlm_new_lockspace(name, cluster, flags | DLM_LSFL_FS, lvblen,
715                                    ops, ops_arg, ops_result, lockspace);
716 }
717
718 int dlm_new_user_lockspace(const char *name, const char *cluster,
719                            uint32_t flags, int lvblen,
720                            const struct dlm_lockspace_ops *ops,
721                            void *ops_arg, int *ops_result,
722                            dlm_lockspace_t **lockspace)
723 {
724         return __dlm_new_lockspace(name, cluster, flags, lvblen, ops,
725                                    ops_arg, ops_result, lockspace);
726 }
727
728 static int lkb_idr_is_local(int id, void *p, void *data)
729 {
730         struct dlm_lkb *lkb = p;
731
732         return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
733 }
734
735 static int lkb_idr_is_any(int id, void *p, void *data)
736 {
737         return 1;
738 }
739
740 static int lkb_idr_free(int id, void *p, void *data)
741 {
742         struct dlm_lkb *lkb = p;
743
744         if (lkb->lkb_lvbptr && test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
745                 dlm_free_lvb(lkb->lkb_lvbptr);
746
747         dlm_free_lkb(lkb);
748         return 0;
749 }
750
751 /* NOTE: We check the lkbidr here rather than the resource table.
752    This is because there may be LKBs queued as ASTs that have been unlinked
753    from their RSBs and are pending deletion once the AST has been delivered */
754
755 static int lockspace_busy(struct dlm_ls *ls, int force)
756 {
757         int rv;
758
759         spin_lock(&ls->ls_lkbidr_spin);
760         if (force == 0) {
761                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
762         } else if (force == 1) {
763                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
764         } else {
765                 rv = 0;
766         }
767         spin_unlock(&ls->ls_lkbidr_spin);
768         return rv;
769 }
770
771 static int release_lockspace(struct dlm_ls *ls, int force)
772 {
773         struct dlm_rsb *rsb;
774         struct rb_node *n;
775         int i, busy, rv;
776
777         busy = lockspace_busy(ls, force);
778
779         spin_lock(&lslist_lock);
780         if (ls->ls_create_count == 1) {
781                 if (busy) {
782                         rv = -EBUSY;
783                 } else {
784                         /* remove_lockspace takes ls off lslist */
785                         ls->ls_create_count = 0;
786                         rv = 0;
787                 }
788         } else if (ls->ls_create_count > 1) {
789                 rv = --ls->ls_create_count;
790         } else {
791                 rv = -EINVAL;
792         }
793         spin_unlock(&lslist_lock);
794
795         if (rv) {
796                 log_debug(ls, "release_lockspace no remove %d", rv);
797                 return rv;
798         }
799
800         if (ls_count == 1)
801                 dlm_midcomms_version_wait();
802
803         dlm_device_deregister(ls);
804
805         if (force < 3 && dlm_user_daemon_available())
806                 do_uevent(ls, 0);
807
808         dlm_recoverd_stop(ls);
809
810         if (ls_count == 1) {
811                 dlm_scand_stop();
812                 dlm_clear_members(ls);
813                 dlm_midcomms_shutdown();
814         }
815
816         dlm_callback_stop(ls);
817
818         remove_lockspace(ls);
819
820         dlm_delete_debug_file(ls);
821
822         idr_destroy(&ls->ls_recover_idr);
823         kfree(ls->ls_recover_buf);
824
825         /*
826          * Free all lkb's in idr
827          */
828
829         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
830         idr_destroy(&ls->ls_lkbidr);
831
832         /*
833          * Free all rsb's on rsbtbl[] lists
834          */
835
836         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
837                 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
838                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
839                         rb_erase(n, &ls->ls_rsbtbl[i].keep);
840                         dlm_free_rsb(rsb);
841                 }
842
843                 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
844                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
845                         rb_erase(n, &ls->ls_rsbtbl[i].toss);
846                         dlm_free_rsb(rsb);
847                 }
848         }
849
850         vfree(ls->ls_rsbtbl);
851
852         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
853                 kfree(ls->ls_remove_names[i]);
854
855         while (!list_empty(&ls->ls_new_rsb)) {
856                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
857                                        res_hashchain);
858                 list_del(&rsb->res_hashchain);
859                 dlm_free_rsb(rsb);
860         }
861
862         /*
863          * Free structures on any other lists
864          */
865
866         dlm_purge_requestqueue(ls);
867         kfree(ls->ls_recover_args);
868         dlm_clear_members(ls);
869         dlm_clear_members_gone(ls);
870         kfree(ls->ls_node_array);
871         log_rinfo(ls, "release_lockspace final free");
872         kobject_put(&ls->ls_kobj);
873         /* The ls structure will be freed when the kobject is done with */
874
875         module_put(THIS_MODULE);
876         return 0;
877 }
878
879 /*
880  * Called when a system has released all its locks and is not going to use the
881  * lockspace any longer.  We free everything we're managing for this lockspace.
882  * Remaining nodes will go through the recovery process as if we'd died.  The
883  * lockspace must continue to function as usual, participating in recoveries,
884  * until this returns.
885  *
886  * Force has 4 possible values:
887  * 0 - don't destroy lockspace if it has any LKBs
888  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
889  * 2 - destroy lockspace regardless of LKBs
890  * 3 - destroy lockspace as part of a forced shutdown
891  */
892
893 int dlm_release_lockspace(void *lockspace, int force)
894 {
895         struct dlm_ls *ls;
896         int error;
897
898         ls = dlm_find_lockspace_local(lockspace);
899         if (!ls)
900                 return -EINVAL;
901         dlm_put_lockspace(ls);
902
903         mutex_lock(&ls_lock);
904         error = release_lockspace(ls, force);
905         if (!error)
906                 ls_count--;
907         if (!ls_count)
908                 dlm_midcomms_stop();
909         mutex_unlock(&ls_lock);
910
911         return error;
912 }
913
914 void dlm_stop_lockspaces(void)
915 {
916         struct dlm_ls *ls;
917         int count;
918
919  restart:
920         count = 0;
921         spin_lock(&lslist_lock);
922         list_for_each_entry(ls, &lslist, ls_list) {
923                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
924                         count++;
925                         continue;
926                 }
927                 spin_unlock(&lslist_lock);
928                 log_error(ls, "no userland control daemon, stopping lockspace");
929                 dlm_ls_stop(ls);
930                 goto restart;
931         }
932         spin_unlock(&lslist_lock);
933
934         if (count)
935                 log_print("dlm user daemon left %d lockspaces", count);
936 }
937