1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6 ** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved.
9 *******************************************************************************
10 ******************************************************************************/
12 #include <linux/module.h>
14 #include "dlm_internal.h"
15 #include "lockspace.h"
24 #include "requestqueue.h"
29 static struct mutex ls_lock;
30 static struct list_head lslist;
31 static spinlock_t lslist_lock;
32 static struct task_struct * scand_task;
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
39 int rc = kstrtoint(buf, 0, &n);
43 ls = dlm_find_lockspace_local(ls->ls_local_handle);
57 dlm_put_lockspace(ls);
61 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
63 int rc = kstrtoint(buf, 0, &ls->ls_uevent_result);
67 set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
68 wake_up(&ls->ls_uevent_wait);
72 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
74 return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
77 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
79 int rc = kstrtouint(buf, 0, &ls->ls_global_id);
86 static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
88 return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
91 static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
94 int rc = kstrtoint(buf, 0, &val);
99 set_bit(LSFL_NODIR, &ls->ls_flags);
103 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
105 uint32_t status = dlm_recover_status(ls);
106 return snprintf(buf, PAGE_SIZE, "%x\n", status);
109 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
111 return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
115 struct attribute attr;
116 ssize_t (*show)(struct dlm_ls *, char *);
117 ssize_t (*store)(struct dlm_ls *, const char *, size_t);
120 static struct dlm_attr dlm_attr_control = {
121 .attr = {.name = "control", .mode = S_IWUSR},
122 .store = dlm_control_store
125 static struct dlm_attr dlm_attr_event = {
126 .attr = {.name = "event_done", .mode = S_IWUSR},
127 .store = dlm_event_store
130 static struct dlm_attr dlm_attr_id = {
131 .attr = {.name = "id", .mode = S_IRUGO | S_IWUSR},
133 .store = dlm_id_store
136 static struct dlm_attr dlm_attr_nodir = {
137 .attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
138 .show = dlm_nodir_show,
139 .store = dlm_nodir_store
142 static struct dlm_attr dlm_attr_recover_status = {
143 .attr = {.name = "recover_status", .mode = S_IRUGO},
144 .show = dlm_recover_status_show
147 static struct dlm_attr dlm_attr_recover_nodeid = {
148 .attr = {.name = "recover_nodeid", .mode = S_IRUGO},
149 .show = dlm_recover_nodeid_show
152 static struct attribute *dlm_attrs[] = {
153 &dlm_attr_control.attr,
154 &dlm_attr_event.attr,
156 &dlm_attr_nodir.attr,
157 &dlm_attr_recover_status.attr,
158 &dlm_attr_recover_nodeid.attr,
161 ATTRIBUTE_GROUPS(dlm);
163 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
166 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
167 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
168 return a->show ? a->show(ls, buf) : 0;
171 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
172 const char *buf, size_t len)
174 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
175 struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
176 return a->store ? a->store(ls, buf, len) : len;
179 static void lockspace_kobj_release(struct kobject *k)
181 struct dlm_ls *ls = container_of(k, struct dlm_ls, ls_kobj);
185 static const struct sysfs_ops dlm_attr_ops = {
186 .show = dlm_attr_show,
187 .store = dlm_attr_store,
190 static struct kobj_type dlm_ktype = {
191 .default_groups = dlm_groups,
192 .sysfs_ops = &dlm_attr_ops,
193 .release = lockspace_kobj_release,
196 static struct kset *dlm_kset;
198 static int do_uevent(struct dlm_ls *ls, int in)
201 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
203 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
205 log_rinfo(ls, "%s the lockspace group...", in ? "joining" : "leaving");
207 /* dlm_controld will see the uevent, do the necessary group management
208 and then write to sysfs to wake us */
210 wait_event(ls->ls_uevent_wait,
211 test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
213 log_rinfo(ls, "group event done %d", ls->ls_uevent_result);
215 return ls->ls_uevent_result;
218 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
219 struct kobj_uevent_env *env)
221 struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
223 add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
227 static const struct kset_uevent_ops dlm_uevent_ops = {
228 .uevent = dlm_uevent,
231 int __init dlm_lockspace_init(void)
234 mutex_init(&ls_lock);
235 INIT_LIST_HEAD(&lslist);
236 spin_lock_init(&lslist_lock);
238 dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
240 printk(KERN_WARNING "%s: can not create kset\n", __func__);
246 void dlm_lockspace_exit(void)
248 kset_unregister(dlm_kset);
251 static struct dlm_ls *find_ls_to_scan(void)
255 spin_lock(&lslist_lock);
256 list_for_each_entry(ls, &lslist, ls_list) {
257 if (time_after_eq(jiffies, ls->ls_scan_time +
258 dlm_config.ci_scan_secs * HZ)) {
259 spin_unlock(&lslist_lock);
263 spin_unlock(&lslist_lock);
267 static int dlm_scand(void *data)
271 while (!kthread_should_stop()) {
272 ls = find_ls_to_scan();
274 if (dlm_lock_recovery_try(ls)) {
275 ls->ls_scan_time = jiffies;
277 dlm_scan_timeout(ls);
278 dlm_scan_waiters(ls);
279 dlm_unlock_recovery(ls);
281 ls->ls_scan_time += HZ;
285 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
290 static int dlm_scand_start(void)
292 struct task_struct *p;
295 p = kthread_run(dlm_scand, NULL, "dlm_scand");
303 static void dlm_scand_stop(void)
305 kthread_stop(scand_task);
308 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
312 spin_lock(&lslist_lock);
314 list_for_each_entry(ls, &lslist, ls_list) {
315 if (ls->ls_global_id == id) {
322 spin_unlock(&lslist_lock);
326 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
330 spin_lock(&lslist_lock);
331 list_for_each_entry(ls, &lslist, ls_list) {
332 if (ls->ls_local_handle == lockspace) {
339 spin_unlock(&lslist_lock);
343 struct dlm_ls *dlm_find_lockspace_device(int minor)
347 spin_lock(&lslist_lock);
348 list_for_each_entry(ls, &lslist, ls_list) {
349 if (ls->ls_device.minor == minor) {
356 spin_unlock(&lslist_lock);
360 void dlm_put_lockspace(struct dlm_ls *ls)
362 spin_lock(&lslist_lock);
364 spin_unlock(&lslist_lock);
367 static void remove_lockspace(struct dlm_ls *ls)
370 spin_lock(&lslist_lock);
371 if (ls->ls_count == 0) {
372 WARN_ON(ls->ls_create_count != 0);
373 list_del(&ls->ls_list);
374 spin_unlock(&lslist_lock);
377 spin_unlock(&lslist_lock);
382 static int threads_start(void)
386 error = dlm_scand_start();
388 log_print("cannot start dlm_scand thread %d", error);
392 /* Thread for sending/receiving messages for all lockspace's */
393 error = dlm_lowcomms_start();
395 log_print("cannot start dlm lowcomms %d", error);
407 static void threads_stop(void)
413 static int new_lockspace(const char *name, const char *cluster,
414 uint32_t flags, int lvblen,
415 const struct dlm_lockspace_ops *ops, void *ops_arg,
416 int *ops_result, dlm_lockspace_t **lockspace)
421 int namelen = strlen(name);
423 if (namelen > DLM_LOCKSPACE_LEN || namelen == 0)
426 if (!lvblen || (lvblen % 8))
429 if (!try_module_get(THIS_MODULE))
432 if (!dlm_user_daemon_available()) {
433 log_print("dlm user daemon not available");
438 if (ops && ops_result) {
439 if (!dlm_config.ci_recover_callbacks)
440 *ops_result = -EOPNOTSUPP;
446 log_print("dlm cluster name '%s' is being used without an application provided cluster name",
447 dlm_config.ci_cluster_name);
449 if (dlm_config.ci_recover_callbacks && cluster &&
450 strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
451 log_print("dlm cluster name '%s' does not match "
452 "the application cluster name '%s'",
453 dlm_config.ci_cluster_name, cluster);
460 spin_lock(&lslist_lock);
461 list_for_each_entry(ls, &lslist, ls_list) {
462 WARN_ON(ls->ls_create_count <= 0);
463 if (ls->ls_namelen != namelen)
465 if (memcmp(ls->ls_name, name, namelen))
467 if (flags & DLM_LSFL_NEWEXCL) {
471 ls->ls_create_count++;
476 spin_unlock(&lslist_lock);
483 ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
486 memcpy(ls->ls_name, name, namelen);
487 ls->ls_namelen = namelen;
488 ls->ls_lvblen = lvblen;
491 ls->ls_scan_time = jiffies;
493 if (ops && dlm_config.ci_recover_callbacks) {
495 ls->ls_ops_arg = ops_arg;
498 if (flags & DLM_LSFL_TIMEWARN)
499 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
501 /* ls_exflags are forced to match among nodes, and we don't
502 need to require all nodes to have some flags set */
503 ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
506 size = dlm_config.ci_rsbtbl_size;
507 ls->ls_rsbtbl_size = size;
509 ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable)));
512 for (i = 0; i < size; i++) {
513 ls->ls_rsbtbl[i].keep.rb_node = NULL;
514 ls->ls_rsbtbl[i].toss.rb_node = NULL;
515 spin_lock_init(&ls->ls_rsbtbl[i].lock);
518 spin_lock_init(&ls->ls_remove_spin);
520 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
521 ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1,
523 if (!ls->ls_remove_names[i])
527 idr_init(&ls->ls_lkbidr);
528 spin_lock_init(&ls->ls_lkbidr_spin);
530 INIT_LIST_HEAD(&ls->ls_waiters);
531 mutex_init(&ls->ls_waiters_mutex);
532 INIT_LIST_HEAD(&ls->ls_orphans);
533 mutex_init(&ls->ls_orphans_mutex);
534 INIT_LIST_HEAD(&ls->ls_timeout);
535 mutex_init(&ls->ls_timeout_mutex);
537 INIT_LIST_HEAD(&ls->ls_new_rsb);
538 spin_lock_init(&ls->ls_new_rsb_spin);
540 INIT_LIST_HEAD(&ls->ls_nodes);
541 INIT_LIST_HEAD(&ls->ls_nodes_gone);
542 ls->ls_num_nodes = 0;
543 ls->ls_low_nodeid = 0;
544 ls->ls_total_weight = 0;
545 ls->ls_node_array = NULL;
547 memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
548 ls->ls_stub_rsb.res_ls = ls;
550 ls->ls_debug_rsb_dentry = NULL;
551 ls->ls_debug_waiters_dentry = NULL;
553 init_waitqueue_head(&ls->ls_uevent_wait);
554 ls->ls_uevent_result = 0;
555 init_completion(&ls->ls_members_done);
556 ls->ls_members_result = -1;
558 mutex_init(&ls->ls_cb_mutex);
559 INIT_LIST_HEAD(&ls->ls_cb_delay);
561 ls->ls_recoverd_task = NULL;
562 mutex_init(&ls->ls_recoverd_active);
563 spin_lock_init(&ls->ls_recover_lock);
564 spin_lock_init(&ls->ls_rcom_spin);
565 get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
566 ls->ls_recover_status = 0;
567 ls->ls_recover_seq = 0;
568 ls->ls_recover_args = NULL;
569 init_rwsem(&ls->ls_in_recovery);
570 init_rwsem(&ls->ls_recv_active);
571 INIT_LIST_HEAD(&ls->ls_requestqueue);
572 mutex_init(&ls->ls_requestqueue_mutex);
573 mutex_init(&ls->ls_clear_proc_locks);
575 ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
576 if (!ls->ls_recover_buf)
580 ls->ls_num_slots = 0;
581 ls->ls_slots_size = 0;
584 INIT_LIST_HEAD(&ls->ls_recover_list);
585 spin_lock_init(&ls->ls_recover_list_lock);
586 idr_init(&ls->ls_recover_idr);
587 spin_lock_init(&ls->ls_recover_idr_lock);
588 ls->ls_recover_list_count = 0;
589 ls->ls_local_handle = ls;
590 init_waitqueue_head(&ls->ls_wait_general);
591 INIT_LIST_HEAD(&ls->ls_root_list);
592 init_rwsem(&ls->ls_root_sem);
594 spin_lock(&lslist_lock);
595 ls->ls_create_count = 1;
596 list_add(&ls->ls_list, &lslist);
597 spin_unlock(&lslist_lock);
599 if (flags & DLM_LSFL_FS) {
600 error = dlm_callback_start(ls);
602 log_error(ls, "can't start dlm_callback %d", error);
607 init_waitqueue_head(&ls->ls_recover_lock_wait);
610 * Once started, dlm_recoverd first looks for ls in lslist, then
611 * initializes ls_in_recovery as locked in "down" mode. We need
612 * to wait for the wakeup from dlm_recoverd because in_recovery
613 * has to start out in down mode.
616 error = dlm_recoverd_start(ls);
618 log_error(ls, "can't start dlm_recoverd %d", error);
622 wait_event(ls->ls_recover_lock_wait,
623 test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags));
625 /* let kobject handle freeing of ls if there's an error */
628 ls->ls_kobj.kset = dlm_kset;
629 error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
633 kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
635 /* This uevent triggers dlm_controld in userspace to add us to the
636 group of nodes that are members of this lockspace (managed by the
637 cluster infrastructure.) Once it's done that, it tells us who the
638 current lockspace members are (via configfs) and then tells the
639 lockspace to start running (via sysfs) in dlm_ls_start(). */
641 error = do_uevent(ls, 1);
645 wait_for_completion(&ls->ls_members_done);
646 error = ls->ls_members_result;
650 dlm_create_debug_file(ls);
652 log_rinfo(ls, "join complete");
658 dlm_clear_members(ls);
659 kfree(ls->ls_node_array);
661 dlm_recoverd_stop(ls);
663 dlm_callback_stop(ls);
665 spin_lock(&lslist_lock);
666 list_del(&ls->ls_list);
667 spin_unlock(&lslist_lock);
668 idr_destroy(&ls->ls_recover_idr);
669 kfree(ls->ls_recover_buf);
671 idr_destroy(&ls->ls_lkbidr);
673 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
674 kfree(ls->ls_remove_names[i]);
675 vfree(ls->ls_rsbtbl);
678 kobject_put(&ls->ls_kobj);
682 module_put(THIS_MODULE);
686 int dlm_new_lockspace(const char *name, const char *cluster,
687 uint32_t flags, int lvblen,
688 const struct dlm_lockspace_ops *ops, void *ops_arg,
689 int *ops_result, dlm_lockspace_t **lockspace)
693 mutex_lock(&ls_lock);
695 error = threads_start();
699 error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
700 ops_result, lockspace);
708 mutex_unlock(&ls_lock);
712 static int lkb_idr_is_local(int id, void *p, void *data)
714 struct dlm_lkb *lkb = p;
716 return lkb->lkb_nodeid == 0 && lkb->lkb_grmode != DLM_LOCK_IV;
719 static int lkb_idr_is_any(int id, void *p, void *data)
724 static int lkb_idr_free(int id, void *p, void *data)
726 struct dlm_lkb *lkb = p;
728 if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
729 dlm_free_lvb(lkb->lkb_lvbptr);
735 /* NOTE: We check the lkbidr here rather than the resource table.
736 This is because there may be LKBs queued as ASTs that have been unlinked
737 from their RSBs and are pending deletion once the AST has been delivered */
739 static int lockspace_busy(struct dlm_ls *ls, int force)
743 spin_lock(&ls->ls_lkbidr_spin);
745 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls);
746 } else if (force == 1) {
747 rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_local, ls);
751 spin_unlock(&ls->ls_lkbidr_spin);
755 static int release_lockspace(struct dlm_ls *ls, int force)
761 busy = lockspace_busy(ls, force);
763 spin_lock(&lslist_lock);
764 if (ls->ls_create_count == 1) {
768 /* remove_lockspace takes ls off lslist */
769 ls->ls_create_count = 0;
772 } else if (ls->ls_create_count > 1) {
773 rv = --ls->ls_create_count;
777 spin_unlock(&lslist_lock);
780 log_debug(ls, "release_lockspace no remove %d", rv);
784 dlm_device_deregister(ls);
786 if (force < 3 && dlm_user_daemon_available())
789 dlm_recoverd_stop(ls);
791 dlm_callback_stop(ls);
793 remove_lockspace(ls);
795 dlm_delete_debug_file(ls);
797 idr_destroy(&ls->ls_recover_idr);
798 kfree(ls->ls_recover_buf);
801 * Free all lkb's in idr
804 idr_for_each(&ls->ls_lkbidr, lkb_idr_free, ls);
805 idr_destroy(&ls->ls_lkbidr);
808 * Free all rsb's on rsbtbl[] lists
811 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
812 while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
813 rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
814 rb_erase(n, &ls->ls_rsbtbl[i].keep);
818 while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
819 rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
820 rb_erase(n, &ls->ls_rsbtbl[i].toss);
825 vfree(ls->ls_rsbtbl);
827 for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++)
828 kfree(ls->ls_remove_names[i]);
830 while (!list_empty(&ls->ls_new_rsb)) {
831 rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb,
833 list_del(&rsb->res_hashchain);
838 * Free structures on any other lists
841 dlm_purge_requestqueue(ls);
842 kfree(ls->ls_recover_args);
843 dlm_clear_members(ls);
844 dlm_clear_members_gone(ls);
845 kfree(ls->ls_node_array);
846 log_rinfo(ls, "release_lockspace final free");
847 kobject_put(&ls->ls_kobj);
848 /* The ls structure will be freed when the kobject is done with */
850 module_put(THIS_MODULE);
855 * Called when a system has released all its locks and is not going to use the
856 * lockspace any longer. We free everything we're managing for this lockspace.
857 * Remaining nodes will go through the recovery process as if we'd died. The
858 * lockspace must continue to function as usual, participating in recoveries,
859 * until this returns.
861 * Force has 4 possible values:
862 * 0 - don't destroy locksapce if it has any LKBs
863 * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
864 * 2 - destroy lockspace regardless of LKBs
865 * 3 - destroy lockspace as part of a forced shutdown
868 int dlm_release_lockspace(void *lockspace, int force)
873 ls = dlm_find_lockspace_local(lockspace);
876 dlm_put_lockspace(ls);
878 mutex_lock(&ls_lock);
879 error = release_lockspace(ls, force);
884 mutex_unlock(&ls_lock);
889 void dlm_stop_lockspaces(void)
896 spin_lock(&lslist_lock);
897 list_for_each_entry(ls, &lslist, ls_list) {
898 if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) {
902 spin_unlock(&lslist_lock);
903 log_error(ls, "no userland control daemon, stopping lockspace");
907 spin_unlock(&lslist_lock);
910 log_print("dlm user daemon left %d lockspaces", count);