GNU Linux-libre 4.9.308-gnu1
[releases.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "dir.h"
19 #include "lowcomms.h"
20 #include "config.h"
21 #include "memory.h"
22 #include "lock.h"
23 #include "recover.h"
24 #include "requestqueue.h"
25 #include "user.h"
26 #include "ast.h"
27
28 static int                      ls_count;
29 static struct mutex             ls_lock;
30 static struct list_head         lslist;
31 static spinlock_t               lslist_lock;
32 static struct task_struct *     scand_task;
33
34
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37         ssize_t ret = len;
38         int n;
39         int rc = kstrtoint(buf, 0, &n);
40
41         if (rc)
42                 return rc;
43         ls = dlm_find_lockspace_local(ls->ls_local_handle);
44         if (!ls)
45                 return -EINVAL;
46
47         switch (n) {
48         case 0:
49                 dlm_ls_stop(ls);
50                 break;
51         case 1:
52                 dlm_ls_start(ls);
53                 break;
54         default:
55                 ret = -EINVAL;
56         }
57         dlm_put_lockspace(ls);
58         return ret;
59 }
60
61 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
62 {
63         int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
64
65         if (rc)
66                 return rc;
67         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
68         wake_up(&ls->ls_uevent_wait);
69         return len;
70 }
71
72 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
73 {
74         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
75 }
76
77 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
78 {
79         int rc = kstrtouint(buf, 0, &ls->ls_global_id);
80
81         if (rc)
82                 return rc;
83         return len;
84 }
85
86 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
87 {
88         return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
89 }
90
91 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
92 {
93         int val;
94         int rc = kstrtoint(buf, 0, &val);
95
96         if (rc)
97                 return rc;
98         if (val == 1)
99                 set_bit(LSFL_NODIR, &ls->ls_flags);
100         return len;
101 }
102
103 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
104 {
105         uint32_t status = dlm_recover_status(ls);
106         return snprintf(buf, PAGE_SIZE, "%x\n", status);
107 }
108
109 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
110 {
111         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
112 }
113
114 struct dlm_attr {
115         struct attribute attr;
116         ssize_t (*show)(struct dlm_ls *, char *);
117         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
118 };
119
120 static struct dlm_attr dlm_attr_control = {
121         .attr  = {.name = "control", .mode = S_IWUSR},
122         .store = dlm_control_store
123 };
124
125 static struct dlm_attr dlm_attr_event = {
126         .attr  = {.name = "event_done", .mode = S_IWUSR},
127         .store = dlm_event_store
128 };
129
130 static struct dlm_attr dlm_attr_id = {
131         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
132         .show  = dlm_id_show,
133         .store = dlm_id_store
134 };
135
136 static struct dlm_attr dlm_attr_nodir = {
137         .attr  = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138         .show  = dlm_nodir_show,
139         .store = dlm_nodir_store
140 };
141
142 static struct dlm_attr dlm_attr_recover_status = {
143         .attr  = {.name = "recover_status", .mode = S_IRUGO},
144         .show  = dlm_recover_status_show
145 };
146
147 static struct dlm_attr dlm_attr_recover_nodeid = {
148         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
149         .show  = dlm_recover_nodeid_show
150 };
151
152 static struct attribute *dlm_attrs[] = {
153         &dlm_attr_control.attr,
154         &dlm_attr_event.attr,
155         &dlm_attr_id.attr,
156         &dlm_attr_nodir.attr,
157         &dlm_attr_recover_status.attr,
158         &dlm_attr_recover_nodeid.attr,
159         NULL,
160 };
161
162 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
163                              char *buf)
164 {
165         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
166         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
167         return a->show ? a->show(ls, buf) : 0;
168 }
169
170 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
171                               const char *buf, size_t len)
172 {
173         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
174         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
175         return a->store ? a->store(ls, buf, len) : len;
176 }
177
178 static void lockspace_kobj_release(struct kobject *k)
179 {
180         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
181         kfree(ls);
182 }
183
184 static const struct sysfs_ops dlm_attr_ops = {
185         .show  = dlm_attr_show,
186         .store = dlm_attr_store,
187 };
188
189 static struct kobj_type dlm_ktype = {
190         .default_attrs = dlm_attrs,
191         .sysfs_ops     = &dlm_attr_ops,
192         .release       = lockspace_kobj_release,
193 };
194
195 static struct kset *dlm_kset;
196
197 static int do_uevent(struct dlm_ls *ls, int in)
198 {
199         int error;
200
201         if (in)
202                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
203         else
204                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
205
206         log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
207
208         /* dlm_controld will see the uevent, do the necessary group management
209            and then write to sysfs to wake us */
210
211         error = wait_event_interruptible(ls->ls_uevent_wait,
212                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
213
214         log_rinfo(ls, "group event done %d %d", error, ls->ls_uevent_result);
215
216         if (error)
217                 goto out;
218
219         error = ls->ls_uevent_result;
220  out:
221         if (error)
222                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
223                           error, ls->ls_uevent_result);
224         return error;
225 }
226
227 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
228                       struct kobj_uevent_env *env)
229 {
230         struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
231
232         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
233         return 0;
234 }
235
236 static struct kset_uevent_ops dlm_uevent_ops = {
237         .uevent = dlm_uevent,
238 };
239
240 int __init dlm_lockspace_init(void)
241 {
242         ls_count = 0;
243         mutex_init(&ls_lock);
244         INIT_LIST_HEAD(&lslist);
245         spin_lock_init(&lslist_lock);
246
247         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
248         if (!dlm_kset) {
249                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
250                 return -ENOMEM;
251         }
252         return 0;
253 }
254
255 void dlm_lockspace_exit(void)
256 {
257         kset_unregister(dlm_kset);
258 }
259
260 static struct dlm_ls *find_ls_to_scan(void)
261 {
262         struct dlm_ls *ls;
263
264         spin_lock(&lslist_lock);
265         list_for_each_entry(ls, &lslist, ls_list) {
266                 if (time_after_eq(jiffies, ls->ls_scan_time +
267                                             dlm_config.ci_scan_secs * HZ)) {
268                         spin_unlock(&lslist_lock);
269                         return ls;
270                 }
271         }
272         spin_unlock(&lslist_lock);
273         return NULL;
274 }
275
276 static int dlm_scand(void *data)
277 {
278         struct dlm_ls *ls;
279
280         while (!kthread_should_stop()) {
281                 ls = find_ls_to_scan();
282                 if (ls) {
283                         if (dlm_lock_recovery_try(ls)) {
284                                 ls->ls_scan_time = jiffies;
285                                 dlm_scan_rsbs(ls);
286                                 dlm_scan_timeout(ls);
287                                 dlm_scan_waiters(ls);
288                                 dlm_unlock_recovery(ls);
289                         } else {
290                                 ls->ls_scan_time += HZ;
291                         }
292                         continue;
293                 }
294                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
295         }
296         return 0;
297 }
298
299 static int dlm_scand_start(void)
300 {
301         struct task_struct *p;
302         int error = 0;
303
304         p = kthread_run(dlm_scand, NULL, "dlm_scand");
305         if (IS_ERR(p))
306                 error = PTR_ERR(p);
307         else
308                 scand_task = p;
309         return error;
310 }
311
312 static void dlm_scand_stop(void)
313 {
314         kthread_stop(scand_task);
315 }
316
317 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
318 {
319         struct dlm_ls *ls;
320
321         spin_lock(&lslist_lock);
322
323         list_for_each_entry(ls, &lslist, ls_list) {
324                 if (ls->ls_global_id == id) {
325                         ls->ls_count++;
326                         goto out;
327                 }
328         }
329         ls = NULL;
330  out:
331         spin_unlock(&lslist_lock);
332         return ls;
333 }
334
335 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
336 {
337         struct dlm_ls *ls;
338
339         spin_lock(&lslist_lock);
340         list_for_each_entry(ls, &lslist, ls_list) {
341                 if (ls->ls_local_handle == lockspace) {
342                         ls->ls_count++;
343                         goto out;
344                 }
345         }
346         ls = NULL;
347  out:
348         spin_unlock(&lslist_lock);
349         return ls;
350 }
351
352 struct dlm_ls *dlm_find_lockspace_device(int minor)
353 {
354         struct dlm_ls *ls;
355
356         spin_lock(&lslist_lock);
357         list_for_each_entry(ls, &lslist, ls_list) {
358                 if (ls->ls_device.minor == minor) {
359                         ls->ls_count++;
360                         goto out;
361                 }
362         }
363         ls = NULL;
364  out:
365         spin_unlock(&lslist_lock);
366         return ls;
367 }
368
369 void dlm_put_lockspace(struct dlm_ls *ls)
370 {
371         spin_lock(&lslist_lock);
372         ls->ls_count--;
373         spin_unlock(&lslist_lock);
374 }
375
376 static void remove_lockspace(struct dlm_ls *ls)
377 {
378         for (;;) {
379                 spin_lock(&lslist_lock);
380                 if (ls->ls_count == 0) {
381                         WARN_ON(ls->ls_create_count != 0);
382                         list_del(&ls->ls_list);
383                         spin_unlock(&lslist_lock);
384                         return;
385                 }
386                 spin_unlock(&lslist_lock);
387                 ssleep(1);
388         }
389 }
390
391 static int threads_start(void)
392 {
393         int error;
394
395         error = dlm_scand_start();
396         if (error) {
397                 log_print("cannot start dlm_scand thread %d", error);
398                 goto fail;
399         }
400
401         /* Thread for sending/receiving messages for all lockspace's */
402         error = dlm_lowcomms_start();
403         if (error) {
404                 log_print("cannot start dlm lowcomms %d", error);
405                 goto scand_fail;
406         }
407
408         return 0;
409
410  scand_fail:
411         dlm_scand_stop();
412  fail:
413         return error;
414 }
415
416 static void threads_stop(void)
417 {
418         dlm_scand_stop();
419         dlm_lowcomms_stop();
420 }
421
422 static int new_lockspace(const char *name, const char *cluster,
423                          uint32_t flags, int lvblen,
424                          const struct dlm_lockspace_ops *ops, void *ops_arg,
425                          int *ops_result, dlm_lockspace_t **lockspace)
426 {
427         struct dlm_ls *ls;
428         int i, size, error;
429         int do_unreg = 0;
430         int namelen = strlen(name);
431
432         if (namelen > DLM_LOCKSPACE_LEN)
433                 return -EINVAL;
434
435         if (!lvblen || (lvblen % 8))
436                 return -EINVAL;
437
438         if (!try_module_get(THIS_MODULE))
439                 return -EINVAL;
440
441         if (!dlm_user_daemon_available()) {
442                 log_print("dlm user daemon not available");
443                 error = -EUNATCH;
444                 goto out;
445         }
446
447         if (ops && ops_result) {
448                 if (!dlm_config.ci_recover_callbacks)
449                         *ops_result = -EOPNOTSUPP;
450                 else
451                         *ops_result = 0;
452         }
453
454         if (dlm_config.ci_recover_callbacks && cluster &&
455             strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
456                 log_print("dlm cluster name %s mismatch %s",
457                           dlm_config.ci_cluster_name, cluster);
458                 error = -EBADR;
459                 goto out;
460         }
461
462         error = 0;
463
464         spin_lock(&lslist_lock);
465         list_for_each_entry(ls, &lslist, ls_list) {
466                 WARN_ON(ls->ls_create_count <= 0);
467                 if (ls->ls_namelen != namelen)
468                         continue;
469                 if (memcmp(ls->ls_name, name, namelen))
470                         continue;
471                 if (flags & DLM_LSFL_NEWEXCL) {
472                         error = -EEXIST;
473                         break;
474                 }
475                 ls->ls_create_count++;
476                 *lockspace = ls;
477                 error = 1;
478                 break;
479         }
480         spin_unlock(&lslist_lock);
481
482         if (error)
483                 goto out;
484
485         error = -ENOMEM;
486
487         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
488         if (!ls)
489                 goto out;
490         memcpy(ls->ls_name, name, namelen);
491         ls->ls_namelen = namelen;
492         ls->ls_lvblen = lvblen;
493         ls->ls_count = 0;
494         ls->ls_flags = 0;
495         ls->ls_scan_time = jiffies;
496
497         if (ops && dlm_config.ci_recover_callbacks) {
498                 ls->ls_ops = ops;
499                 ls->ls_ops_arg = ops_arg;
500         }
501
502         if (flags & DLM_LSFL_TIMEWARN)
503                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
504
505         /* ls_exflags are forced to match among nodes, and we don't
506            need to require all nodes to have some flags set */
507         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
508                                     DLM_LSFL_NEWEXCL));
509
510         size = dlm_config.ci_rsbtbl_size;
511         ls->ls_rsbtbl_size = size;
512
513         ls->ls_rsbtbl = vmalloc(sizeof(struct dlm_rsbtable) * size);
514         if (!ls->ls_rsbtbl)
515                 goto out_lsfree;
516         for (i = 0; i < size; i++) {
517                 ls->ls_rsbtbl[i].keep.rb_node = NULL;
518                 ls->ls_rsbtbl[i].toss.rb_node = NULL;
519                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
520         }
521
522         spin_lock_init(&ls->ls_remove_spin);
523
524         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
525                 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
526                                                  GFP_KERNEL);
527                 if (!ls->ls_remove_names[i])
528                         goto out_rsbtbl;
529         }
530
531         idr_init(&ls->ls_lkbidr);
532         spin_lock_init(&ls->ls_lkbidr_spin);
533
534         INIT_LIST_HEAD(&ls->ls_waiters);
535         mutex_init(&ls->ls_waiters_mutex);
536         INIT_LIST_HEAD(&ls->ls_orphans);
537         mutex_init(&ls->ls_orphans_mutex);
538         INIT_LIST_HEAD(&ls->ls_timeout);
539         mutex_init(&ls->ls_timeout_mutex);
540
541         INIT_LIST_HEAD(&ls->ls_new_rsb);
542         spin_lock_init(&ls->ls_new_rsb_spin);
543
544         INIT_LIST_HEAD(&ls->ls_nodes);
545         INIT_LIST_HEAD(&ls->ls_nodes_gone);
546         ls->ls_num_nodes = 0;
547         ls->ls_low_nodeid = 0;
548         ls->ls_total_weight = 0;
549         ls->ls_node_array = NULL;
550
551         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
552         ls->ls_stub_rsb.res_ls = ls;
553
554         ls->ls_debug_rsb_dentry = NULL;
555         ls->ls_debug_waiters_dentry = NULL;
556
557         init_waitqueue_head(&ls->ls_uevent_wait);
558         ls->ls_uevent_result = 0;
559         init_completion(&ls->ls_members_done);
560         ls->ls_members_result = -1;
561
562         mutex_init(&ls->ls_cb_mutex);
563         INIT_LIST_HEAD(&ls->ls_cb_delay);
564
565         ls->ls_recoverd_task = NULL;
566         mutex_init(&ls->ls_recoverd_active);
567         spin_lock_init(&ls->ls_recover_lock);
568         spin_lock_init(&ls->ls_rcom_spin);
569         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
570         ls->ls_recover_status = 0;
571         ls->ls_recover_seq = 0;
572         ls->ls_recover_args = NULL;
573         init_rwsem(&ls->ls_in_recovery);
574         init_rwsem(&ls->ls_recv_active);
575         INIT_LIST_HEAD(&ls->ls_requestqueue);
576         mutex_init(&ls->ls_requestqueue_mutex);
577         mutex_init(&ls->ls_clear_proc_locks);
578
579         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
580         if (!ls->ls_recover_buf)
581                 goto out_lkbidr;
582
583         ls->ls_slot = 0;
584         ls->ls_num_slots = 0;
585         ls->ls_slots_size = 0;
586         ls->ls_slots = NULL;
587
588         INIT_LIST_HEAD(&ls->ls_recover_list);
589         spin_lock_init(&ls->ls_recover_list_lock);
590         idr_init(&ls->ls_recover_idr);
591         spin_lock_init(&ls->ls_recover_idr_lock);
592         ls->ls_recover_list_count = 0;
593         ls->ls_local_handle = ls;
594         init_waitqueue_head(&ls->ls_wait_general);
595         INIT_LIST_HEAD(&ls->ls_root_list);
596         init_rwsem(&ls->ls_root_sem);
597
598         spin_lock(&lslist_lock);
599         ls->ls_create_count = 1;
600         list_add(&ls->ls_list, &lslist);
601         spin_unlock(&lslist_lock);
602
603         if (flags & DLM_LSFL_FS) {
604                 error = dlm_callback_start(ls);
605                 if (error) {
606                         log_error(ls, "can't start dlm_callback %d", error);
607                         goto out_delist;
608                 }
609         }
610
611         init_waitqueue_head(&ls->ls_recover_lock_wait);
612
613         /*
614          * Once started, dlm_recoverd first looks for ls in lslist, then
615          * initializes ls_in_recovery as locked in "down" mode.  We need
616          * to wait for the wakeup from dlm_recoverd because in_recovery
617          * has to start out in down mode.
618          */
619
620         error = dlm_recoverd_start(ls);
621         if (error) {
622                 log_error(ls, "can't start dlm_recoverd %d", error);
623                 goto out_callback;
624         }
625
626         wait_event(ls->ls_recover_lock_wait,
627                    test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
628
629         /* let kobject handle freeing of ls if there's an error */
630         do_unreg = 1;
631
632         ls->ls_kobj.kset = dlm_kset;
633         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
634                                      "%s", ls->ls_name);
635         if (error)
636                 goto out_recoverd;
637         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
638
639         /* This uevent triggers dlm_controld in userspace to add us to the
640            group of nodes that are members of this lockspace (managed by the
641            cluster infrastructure.)  Once it's done that, it tells us who the
642            current lockspace members are (via configfs) and then tells the
643            lockspace to start running (via sysfs) in dlm_ls_start(). */
644
645         error = do_uevent(ls, 1);
646         if (error)
647                 goto out_recoverd;
648
649         wait_for_completion(&ls->ls_members_done);
650         error = ls->ls_members_result;
651         if (error)
652                 goto out_members;
653
654         dlm_create_debug_file(ls);
655
656         log_rinfo(ls, "join complete");
657         *lockspace = ls;
658         return 0;
659
660  out_members:
661         do_uevent(ls, 0);
662         dlm_clear_members(ls);
663         kfree(ls->ls_node_array);
664  out_recoverd:
665         dlm_recoverd_stop(ls);
666  out_callback:
667         dlm_callback_stop(ls);
668  out_delist:
669         spin_lock(&lslist_lock);
670         list_del(&ls->ls_list);
671         spin_unlock(&lslist_lock);
672         idr_destroy(&ls->ls_recover_idr);
673         kfree(ls->ls_recover_buf);
674  out_lkbidr:
675         idr_destroy(&ls->ls_lkbidr);
676  out_rsbtbl:
677         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
678                 if (ls->ls_remove_names[i])
679                         kfree(ls->ls_remove_names[i]);
680         }
681         vfree(ls->ls_rsbtbl);
682  out_lsfree:
683         if (do_unreg)
684                 kobject_put(&ls->ls_kobj);
685         else
686                 kfree(ls);
687  out:
688         module_put(THIS_MODULE);
689         return error;
690 }
691
692 int dlm_new_lockspace(const char *name, const char *cluster,
693                       uint32_t flags, int lvblen,
694                       const struct dlm_lockspace_ops *ops, void *ops_arg,
695                       int *ops_result, dlm_lockspace_t **lockspace)
696 {
697         int error = 0;
698
699         mutex_lock(&ls_lock);
700         if (!ls_count)
701                 error = threads_start();
702         if (error)
703                 goto out;
704
705         error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
706                               ops_result, lockspace);
707         if (!error)
708                 ls_count++;
709         if (error > 0)
710                 error = 0;
711         if (!ls_count)
712                 threads_stop();
713  out:
714         mutex_unlock(&ls_lock);
715         return error;
716 }
717
718 static int lkb_idr_is_local(int id, void *p, void *data)
719 {
720         struct dlm_lkb *lkb = p;
721
722         return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
723 }
724
725 static int lkb_idr_is_any(int id, void *p, void *data)
726 {
727         return 1;
728 }
729
730 static int lkb_idr_free(int id, void *p, void *data)
731 {
732         struct dlm_lkb *lkb = p;
733
734         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
735                 dlm_free_lvb(lkb->lkb_lvbptr);
736
737         dlm_free_lkb(lkb);
738         return 0;
739 }
740
741 /* NOTE: We check the lkbidr here rather than the resource table.
742    This is because there may be LKBs queued as ASTs that have been unlinked
743    from their RSBs and are pending deletion once the AST has been delivered */
744
745 static int lockspace_busy(struct dlm_ls *ls, int force)
746 {
747         int rv;
748
749         spin_lock(&ls->ls_lkbidr_spin);
750         if (force == 0) {
751                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
752         } else if (force == 1) {
753                 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
754         } else {
755                 rv = 0;
756         }
757         spin_unlock(&ls->ls_lkbidr_spin);
758         return rv;
759 }
760
761 static int release_lockspace(struct dlm_ls *ls, int force)
762 {
763         struct dlm_rsb *rsb;
764         struct rb_node *n;
765         int i, busy, rv;
766
767         busy = lockspace_busy(ls, force);
768
769         spin_lock(&lslist_lock);
770         if (ls->ls_create_count == 1) {
771                 if (busy) {
772                         rv = -EBUSY;
773                 } else {
774                         /* remove_lockspace takes ls off lslist */
775                         ls->ls_create_count = 0;
776                         rv = 0;
777                 }
778         } else if (ls->ls_create_count > 1) {
779                 rv = --ls->ls_create_count;
780         } else {
781                 rv = -EINVAL;
782         }
783         spin_unlock(&lslist_lock);
784
785         if (rv) {
786                 log_debug(ls, "release_lockspace no remove %d", rv);
787                 return rv;
788         }
789
790         dlm_device_deregister(ls);
791
792         if (force < 3 && dlm_user_daemon_available())
793                 do_uevent(ls, 0);
794
795         dlm_recoverd_stop(ls);
796
797         dlm_callback_stop(ls);
798
799         remove_lockspace(ls);
800
801         dlm_delete_debug_file(ls);
802
803         idr_destroy(&ls->ls_recover_idr);
804         kfree(ls->ls_recover_buf);
805
806         /*
807          * Free all lkb's in idr
808          */
809
810         idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
811         idr_destroy(&ls->ls_lkbidr);
812
813         /*
814          * Free all rsb's on rsbtbl[] lists
815          */
816
817         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
818                 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
819                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
820                         rb_erase(n, &ls->ls_rsbtbl[i].keep);
821                         dlm_free_rsb(rsb);
822                 }
823
824                 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
825                         rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
826                         rb_erase(n, &ls->ls_rsbtbl[i].toss);
827                         dlm_free_rsb(rsb);
828                 }
829         }
830
831         vfree(ls->ls_rsbtbl);
832
833         for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
834                 kfree(ls->ls_remove_names[i]);
835
836         while (!list_empty(&ls->ls_new_rsb)) {
837                 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
838                                        res_hashchain);
839                 list_del(&rsb->res_hashchain);
840                 dlm_free_rsb(rsb);
841         }
842
843         /*
844          * Free structures on any other lists
845          */
846
847         dlm_purge_requestqueue(ls);
848         kfree(ls->ls_recover_args);
849         dlm_clear_members(ls);
850         dlm_clear_members_gone(ls);
851         kfree(ls->ls_node_array);
852         log_rinfo(ls, "release_lockspace final free");
853         kobject_put(&ls->ls_kobj);
854         /* The ls structure will be freed when the kobject is done with */
855
856         module_put(THIS_MODULE);
857         return 0;
858 }
859
860 /*
861  * Called when a system has released all its locks and is not going to use the
862  * lockspace any longer.  We free everything we're managing for this lockspace.
863  * Remaining nodes will go through the recovery process as if we'd died.  The
864  * lockspace must continue to function as usual, participating in recoveries,
865  * until this returns.
866  *
867  * Force has 4 possible values:
868  * 0 - don't destroy locksapce if it has any LKBs
869  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
870  * 2 - destroy lockspace regardless of LKBs
871  * 3 - destroy lockspace as part of a forced shutdown
872  */
873
874 int dlm_release_lockspace(void *lockspace, int force)
875 {
876         struct dlm_ls *ls;
877         int error;
878
879         ls = dlm_find_lockspace_local(lockspace);
880         if (!ls)
881                 return -EINVAL;
882         dlm_put_lockspace(ls);
883
884         mutex_lock(&ls_lock);
885         error = release_lockspace(ls, force);
886         if (!error)
887                 ls_count--;
888         if (!ls_count)
889                 threads_stop();
890         mutex_unlock(&ls_lock);
891
892         return error;
893 }
894
895 void dlm_stop_lockspaces(void)
896 {
897         struct dlm_ls *ls;
898         int count;
899
900  restart:
901         count = 0;
902         spin_lock(&lslist_lock);
903         list_for_each_entry(ls, &lslist, ls_list) {
904                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
905                         count++;
906                         continue;
907                 }
908                 spin_unlock(&lslist_lock);
909                 log_error(ls, "no userland control daemon, stopping lockspace");
910                 dlm_ls_stop(ls);
911                 goto restart;
912         }
913         spin_unlock(&lslist_lock);
914
915         if (count)
916                 log_print("dlm user daemon left %d lockspaces", count);
917 }
918