1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
5 ** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved.
8 *******************************************************************************
9 ******************************************************************************/
11 /* Central locking logic has four stages:
31 Stage 1 (lock, unlock) is mainly about checking input args and
32 splitting into one of the four main operations:
34 dlm_lock = request_lock
35 dlm_lock+CONVERT = convert_lock
36 dlm_unlock = unlock_lock
37 dlm_unlock+CANCEL = cancel_lock
39 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40 provided to the next stage.
42 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43 When remote, it calls send_xxxx(), when local it calls do_xxxx().
45 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
46 given rsb and lkb and queues callbacks.
48 For remote operations, send_xxxx() results in the corresponding do_xxxx()
49 function being executed on the remote node. The connecting send/receive
50 calls on local (L) and remote (R) nodes:
52 L: send_xxxx() -> R: receive_xxxx()
54 L: receive_xxxx_reply() <- R: send_xxxx_reply()
56 #include <trace/events/dlm.h>
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
65 #include "requestqueue.h"
69 #include "lockspace.h"
74 #include "lvb_table.h"
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89 struct dlm_message *ms);
90 static int receive_extralen(struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void del_timeout(struct dlm_lkb *lkb);
93 static void toss_rsb(struct kref *kref);
96 * Lock compatibilty matrix - thanks Steve
97 * UN = Unlocked state. Not really a state, used as a flag
98 * PD = Padding. Used to make the matrix a nice power of two in size
99 * Other states are the same as the VMS DLM.
100 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
103 static const int __dlm_compat_matrix[8][8] = {
104 /* UN NL CR CW PR PW EX PD */
105 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
106 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
107 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
108 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
109 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
110 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
111 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
112 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
116 * This defines the direction of transfer of LVB data.
117 * Granted mode is the row; requested mode is the column.
118 * Usage: matrix[grmode+1][rqmode+1]
119 * 1 = LVB is returned to the caller
120 * 0 = LVB is written to the resource
121 * -1 = nothing happens to the LVB
124 const int dlm_lvb_operations[8][8] = {
125 /* UN NL CR CW PR PW EX PD*/
126 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
127 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
128 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
129 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
130 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
131 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
132 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
133 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
136 #define modes_compat(gr, rq) \
137 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
139 int dlm_modes_compat(int mode1, int mode2)
141 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
145 * Compatibility matrix for conversions with QUECVT set.
146 * Granted mode is the row; requested mode is the column.
147 * Usage: matrix[grmode+1][rqmode+1]
150 static const int __quecvt_compat_matrix[8][8] = {
151 /* UN NL CR CW PR PW EX PD */
152 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
153 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
154 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
155 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
156 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
157 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
158 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
159 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
162 void dlm_print_lkb(struct dlm_lkb *lkb)
164 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
165 "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
166 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
167 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
168 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
169 (unsigned long long)lkb->lkb_recover_seq);
172 static void dlm_print_rsb(struct dlm_rsb *r)
174 printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
176 r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
177 r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
181 void dlm_dump_rsb(struct dlm_rsb *r)
187 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
188 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
189 printk(KERN_ERR "rsb lookup list\n");
190 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
192 printk(KERN_ERR "rsb grant queue:\n");
193 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
195 printk(KERN_ERR "rsb convert queue:\n");
196 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
198 printk(KERN_ERR "rsb wait queue:\n");
199 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
203 /* Threads cannot use the lockspace while it's being recovered */
205 static inline void dlm_lock_recovery(struct dlm_ls *ls)
207 down_read(&ls->ls_in_recovery);
210 void dlm_unlock_recovery(struct dlm_ls *ls)
212 up_read(&ls->ls_in_recovery);
215 int dlm_lock_recovery_try(struct dlm_ls *ls)
217 return down_read_trylock(&ls->ls_in_recovery);
220 static inline int can_be_queued(struct dlm_lkb *lkb)
222 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
225 static inline int force_blocking_asts(struct dlm_lkb *lkb)
227 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
230 static inline int is_demoted(struct dlm_lkb *lkb)
232 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
235 static inline int is_altmode(struct dlm_lkb *lkb)
237 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
240 static inline int is_granted(struct dlm_lkb *lkb)
242 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
245 static inline int is_remote(struct dlm_rsb *r)
247 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
248 return !!r->res_nodeid;
251 static inline int is_process_copy(struct dlm_lkb *lkb)
253 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
256 static inline int is_master_copy(struct dlm_lkb *lkb)
258 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
261 static inline int middle_conversion(struct dlm_lkb *lkb)
263 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
269 static inline int down_conversion(struct dlm_lkb *lkb)
271 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
276 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
281 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
284 static inline int is_overlap(struct dlm_lkb *lkb)
286 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
287 DLM_IFL_OVERLAP_CANCEL));
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
292 if (is_master_copy(lkb))
297 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
299 #ifdef CONFIG_DLM_DEPRECATED_API
300 /* if the operation was a cancel, then return -DLM_ECANCEL, if a
301 timeout caused the cancel then return -ETIMEDOUT */
302 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
303 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
308 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
309 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
313 dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
316 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
319 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
322 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
324 if (is_master_copy(lkb)) {
325 send_bast(r, lkb, rqmode);
327 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
332 * Basic operations on rsb's and lkb's
335 /* This is only called to add a reference when the code already holds
336 a valid reference to the rsb, so there's no need for locking. */
338 static inline void hold_rsb(struct dlm_rsb *r)
340 kref_get(&r->res_ref);
343 void dlm_hold_rsb(struct dlm_rsb *r)
348 /* When all references to the rsb are gone it's transferred to
349 the tossed list for later disposal. */
351 static void put_rsb(struct dlm_rsb *r)
353 struct dlm_ls *ls = r->res_ls;
354 uint32_t bucket = r->res_bucket;
357 rv = kref_put_lock(&r->res_ref, toss_rsb,
358 &ls->ls_rsbtbl[bucket].lock);
360 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
363 void dlm_put_rsb(struct dlm_rsb *r)
368 static int pre_rsb_struct(struct dlm_ls *ls)
370 struct dlm_rsb *r1, *r2;
373 spin_lock(&ls->ls_new_rsb_spin);
374 if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
375 spin_unlock(&ls->ls_new_rsb_spin);
378 spin_unlock(&ls->ls_new_rsb_spin);
380 r1 = dlm_allocate_rsb(ls);
381 r2 = dlm_allocate_rsb(ls);
383 spin_lock(&ls->ls_new_rsb_spin);
385 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
386 ls->ls_new_rsb_count++;
389 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
390 ls->ls_new_rsb_count++;
392 count = ls->ls_new_rsb_count;
393 spin_unlock(&ls->ls_new_rsb_spin);
400 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
401 unlock any spinlocks, go back and call pre_rsb_struct again.
402 Otherwise, take an rsb off the list and return it. */
404 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
405 struct dlm_rsb **r_ret)
410 spin_lock(&ls->ls_new_rsb_spin);
411 if (list_empty(&ls->ls_new_rsb)) {
412 count = ls->ls_new_rsb_count;
413 spin_unlock(&ls->ls_new_rsb_spin);
414 log_debug(ls, "find_rsb retry %d %d %s",
415 count, dlm_config.ci_new_rsb_count, name);
419 r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
420 list_del(&r->res_hashchain);
421 /* Convert the empty list_head to a NULL rb_node for tree usage: */
422 memset(&r->res_hashnode, 0, sizeof(struct rb_node));
423 ls->ls_new_rsb_count--;
424 spin_unlock(&ls->ls_new_rsb_spin);
428 memcpy(r->res_name, name, len);
429 mutex_init(&r->res_mutex);
431 INIT_LIST_HEAD(&r->res_lookup);
432 INIT_LIST_HEAD(&r->res_grantqueue);
433 INIT_LIST_HEAD(&r->res_convertqueue);
434 INIT_LIST_HEAD(&r->res_waitqueue);
435 INIT_LIST_HEAD(&r->res_root_list);
436 INIT_LIST_HEAD(&r->res_recover_list);
442 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
444 char maxname[DLM_RESNAME_MAXLEN];
446 memset(maxname, 0, DLM_RESNAME_MAXLEN);
447 memcpy(maxname, name, nlen);
448 return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
451 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
452 struct dlm_rsb **r_ret)
454 struct rb_node *node = tree->rb_node;
459 r = rb_entry(node, struct dlm_rsb, res_hashnode);
460 rc = rsb_cmp(r, name, len);
462 node = node->rb_left;
464 node = node->rb_right;
476 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
478 struct rb_node **newn = &tree->rb_node;
479 struct rb_node *parent = NULL;
483 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
487 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
489 newn = &parent->rb_left;
491 newn = &parent->rb_right;
493 log_print("rsb_insert match");
500 rb_link_node(&rsb->res_hashnode, parent, newn);
501 rb_insert_color(&rsb->res_hashnode, tree);
506 * Find rsb in rsbtbl and potentially create/add one
508 * Delaying the release of rsb's has a similar benefit to applications keeping
509 * NL locks on an rsb, but without the guarantee that the cached master value
510 * will still be valid when the rsb is reused. Apps aren't always smart enough
511 * to keep NL locks on an rsb that they may lock again shortly; this can lead
512 * to excessive master lookups and removals if we don't delay the release.
514 * Searching for an rsb means looking through both the normal list and toss
515 * list. When found on the toss list the rsb is moved to the normal list with
516 * ref count of 1; when found on normal list the ref count is incremented.
518 * rsb's on the keep list are being used locally and refcounted.
519 * rsb's on the toss list are not being used locally, and are not refcounted.
521 * The toss list rsb's were either
522 * - previously used locally but not any more (were on keep list, then
523 * moved to toss list when last refcount dropped)
524 * - created and put on toss list as a directory record for a lookup
525 * (we are the dir node for the res, but are not using the res right now,
526 * but some other node is)
528 * The purpose of find_rsb() is to return a refcounted rsb for local use.
529 * So, if the given rsb is on the toss list, it is moved to the keep list
530 * before being returned.
532 * toss_rsb() happens when all local usage of the rsb is done, i.e. no
533 * more refcounts exist, so the rsb is moved from the keep list to the
536 * rsb's on both keep and toss lists are used for doing a name to master
537 * lookups. rsb's that are in use locally (and being refcounted) are on
538 * the keep list, rsb's that are not in use locally (not refcounted) and
539 * only exist for name/master lookups are on the toss list.
541 * rsb's on the toss list who's dir_nodeid is not local can have stale
542 * name/master mappings. So, remote requests on such rsb's can potentially
543 * return with an error, which means the mapping is stale and needs to
544 * be updated with a new lookup. (The idea behind MASTER UNCERTAIN and
545 * first_lkid is to keep only a single outstanding request on an rsb
546 * while that rsb has a potentially stale master.)
549 static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
550 uint32_t hash, uint32_t b,
551 int dir_nodeid, int from_nodeid,
552 unsigned int flags, struct dlm_rsb **r_ret)
554 struct dlm_rsb *r = NULL;
555 int our_nodeid = dlm_our_nodeid();
562 if (flags & R_RECEIVE_REQUEST) {
563 if (from_nodeid == dir_nodeid)
567 } else if (flags & R_REQUEST) {
572 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
573 * from_nodeid has sent us a lock in dlm_recover_locks, believing
574 * we're the new master. Our local recovery may not have set
575 * res_master_nodeid to our_nodeid yet, so allow either. Don't
576 * create the rsb; dlm_recover_process_copy() will handle EBADR
579 * If someone sends us a request, we are the dir node, and we do
580 * not find the rsb anywhere, then recreate it. This happens if
581 * someone sends us a request after we have removed/freed an rsb
582 * from our toss list. (They sent a request instead of lookup
583 * because they are using an rsb from their toss list.)
586 if (from_local || from_dir ||
587 (from_other && (dir_nodeid == our_nodeid))) {
593 error = pre_rsb_struct(ls);
598 spin_lock(&ls->ls_rsbtbl[b].lock);
600 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
605 * rsb is active, so we can't check master_nodeid without lock_rsb.
608 kref_get(&r->res_ref);
613 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
618 * rsb found inactive (master_nodeid may be out of date unless
619 * we are the dir_nodeid or were the master) No other thread
620 * is using this rsb because it's on the toss list, so we can
621 * look at or update res_master_nodeid without lock_rsb.
624 if ((r->res_master_nodeid != our_nodeid) && from_other) {
625 /* our rsb was not master, and another node (not the dir node)
626 has sent us a request */
627 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
628 from_nodeid, r->res_master_nodeid, dir_nodeid,
634 if ((r->res_master_nodeid != our_nodeid) && from_dir) {
635 /* don't think this should ever happen */
636 log_error(ls, "find_rsb toss from_dir %d master %d",
637 from_nodeid, r->res_master_nodeid);
639 /* fix it and go on */
640 r->res_master_nodeid = our_nodeid;
642 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
643 r->res_first_lkid = 0;
646 if (from_local && (r->res_master_nodeid != our_nodeid)) {
647 /* Because we have held no locks on this rsb,
648 res_master_nodeid could have become stale. */
649 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
650 r->res_first_lkid = 0;
653 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
654 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
663 if (error == -EBADR && !create)
666 error = get_rsb_struct(ls, name, len, &r);
667 if (error == -EAGAIN) {
668 spin_unlock(&ls->ls_rsbtbl[b].lock);
676 r->res_dir_nodeid = dir_nodeid;
677 kref_init(&r->res_ref);
680 /* want to see how often this happens */
681 log_debug(ls, "find_rsb new from_dir %d recreate %s",
682 from_nodeid, r->res_name);
683 r->res_master_nodeid = our_nodeid;
688 if (from_other && (dir_nodeid != our_nodeid)) {
689 /* should never happen */
690 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
691 from_nodeid, dir_nodeid, our_nodeid, r->res_name);
699 log_debug(ls, "find_rsb new from_other %d dir %d %s",
700 from_nodeid, dir_nodeid, r->res_name);
703 if (dir_nodeid == our_nodeid) {
704 /* When we are the dir nodeid, we can set the master
706 r->res_master_nodeid = our_nodeid;
709 /* set_master will send_lookup to dir_nodeid */
710 r->res_master_nodeid = 0;
715 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
717 spin_unlock(&ls->ls_rsbtbl[b].lock);
723 /* During recovery, other nodes can send us new MSTCPY locks (from
724 dlm_recover_locks) before we've made ourself master (in
725 dlm_recover_masters). */
727 static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
728 uint32_t hash, uint32_t b,
729 int dir_nodeid, int from_nodeid,
730 unsigned int flags, struct dlm_rsb **r_ret)
732 struct dlm_rsb *r = NULL;
733 int our_nodeid = dlm_our_nodeid();
734 int recover = (flags & R_RECEIVE_RECOVER);
738 error = pre_rsb_struct(ls);
742 spin_lock(&ls->ls_rsbtbl[b].lock);
744 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
749 * rsb is active, so we can't check master_nodeid without lock_rsb.
752 kref_get(&r->res_ref);
757 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
762 * rsb found inactive. No other thread is using this rsb because
763 * it's on the toss list, so we can look at or update
764 * res_master_nodeid without lock_rsb.
767 if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
768 /* our rsb is not master, and another node has sent us a
769 request; this should never happen */
770 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
771 from_nodeid, r->res_master_nodeid, dir_nodeid);
777 if (!recover && (r->res_master_nodeid != our_nodeid) &&
778 (dir_nodeid == our_nodeid)) {
779 /* our rsb is not master, and we are dir; may as well fix it;
780 this should never happen */
781 log_error(ls, "find_rsb toss our %d master %d dir %d",
782 our_nodeid, r->res_master_nodeid, dir_nodeid);
784 r->res_master_nodeid = our_nodeid;
788 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
789 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
798 error = get_rsb_struct(ls, name, len, &r);
799 if (error == -EAGAIN) {
800 spin_unlock(&ls->ls_rsbtbl[b].lock);
808 r->res_dir_nodeid = dir_nodeid;
809 r->res_master_nodeid = dir_nodeid;
810 r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
811 kref_init(&r->res_ref);
813 error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
815 spin_unlock(&ls->ls_rsbtbl[b].lock);
821 static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
822 unsigned int flags, struct dlm_rsb **r_ret)
827 if (len > DLM_RESNAME_MAXLEN)
830 hash = jhash(name, len, 0);
831 b = hash & (ls->ls_rsbtbl_size - 1);
833 dir_nodeid = dlm_hash2nodeid(ls, hash);
835 if (dlm_no_directory(ls))
836 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
837 from_nodeid, flags, r_ret);
839 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
840 from_nodeid, flags, r_ret);
843 /* we have received a request and found that res_master_nodeid != our_nodeid,
844 so we need to return an error or make ourself the master */
846 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
849 if (dlm_no_directory(ls)) {
850 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
851 from_nodeid, r->res_master_nodeid,
857 if (from_nodeid != r->res_dir_nodeid) {
858 /* our rsb is not master, and another node (not the dir node)
859 has sent us a request. this is much more common when our
860 master_nodeid is zero, so limit debug to non-zero. */
862 if (r->res_master_nodeid) {
863 log_debug(ls, "validate master from_other %d master %d "
864 "dir %d first %x %s", from_nodeid,
865 r->res_master_nodeid, r->res_dir_nodeid,
866 r->res_first_lkid, r->res_name);
870 /* our rsb is not master, but the dir nodeid has sent us a
871 request; this could happen with master 0 / res_nodeid -1 */
873 if (r->res_master_nodeid) {
874 log_error(ls, "validate master from_dir %d master %d "
876 from_nodeid, r->res_master_nodeid,
877 r->res_first_lkid, r->res_name);
880 r->res_master_nodeid = dlm_our_nodeid();
886 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
887 int from_nodeid, bool toss_list, unsigned int flags,
888 int *r_nodeid, int *result)
890 int fix_master = (flags & DLM_LU_RECOVER_MASTER);
891 int from_master = (flags & DLM_LU_RECOVER_DIR);
893 if (r->res_dir_nodeid != our_nodeid) {
894 /* should not happen, but may as well fix it and carry on */
895 log_error(ls, "%s res_dir %d our %d %s", __func__,
896 r->res_dir_nodeid, our_nodeid, r->res_name);
897 r->res_dir_nodeid = our_nodeid;
900 if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
901 /* Recovery uses this function to set a new master when
902 * the previous master failed. Setting NEW_MASTER will
903 * force dlm_recover_masters to call recover_master on this
904 * rsb even though the res_nodeid is no longer removed.
907 r->res_master_nodeid = from_nodeid;
908 r->res_nodeid = from_nodeid;
909 rsb_set_flag(r, RSB_NEW_MASTER);
912 /* I don't think we should ever find it on toss list. */
913 log_error(ls, "%s fix_master on toss", __func__);
918 if (from_master && (r->res_master_nodeid != from_nodeid)) {
919 /* this will happen if from_nodeid became master during
920 * a previous recovery cycle, and we aborted the previous
921 * cycle before recovering this master value
924 log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
925 __func__, from_nodeid, r->res_master_nodeid,
926 r->res_nodeid, r->res_first_lkid, r->res_name);
928 if (r->res_master_nodeid == our_nodeid) {
929 log_error(ls, "from_master %d our_master", from_nodeid);
934 r->res_master_nodeid = from_nodeid;
935 r->res_nodeid = from_nodeid;
936 rsb_set_flag(r, RSB_NEW_MASTER);
939 if (!r->res_master_nodeid) {
940 /* this will happen if recovery happens while we're looking
941 * up the master for this rsb
944 log_debug(ls, "%s master 0 to %d first %x %s", __func__,
945 from_nodeid, r->res_first_lkid, r->res_name);
946 r->res_master_nodeid = from_nodeid;
947 r->res_nodeid = from_nodeid;
950 if (!from_master && !fix_master &&
951 (r->res_master_nodeid == from_nodeid)) {
952 /* this can happen when the master sends remove, the dir node
953 * finds the rsb on the keep list and ignores the remove,
954 * and the former master sends a lookup
957 log_limit(ls, "%s from master %d flags %x first %x %s",
958 __func__, from_nodeid, flags, r->res_first_lkid,
963 *r_nodeid = r->res_master_nodeid;
965 *result = DLM_LU_MATCH;
969 * We're the dir node for this res and another node wants to know the
970 * master nodeid. During normal operation (non recovery) this is only
971 * called from receive_lookup(); master lookups when the local node is
972 * the dir node are done by find_rsb().
974 * normal operation, we are the dir node for a resource
979 * . dlm_master_lookup flags 0
981 * recover directory, we are rebuilding dir for all resources
982 * . dlm_recover_directory
984 * remote node sends back the rsb names it is master of and we are dir of
985 * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
986 * we either create new rsb setting remote node as master, or find existing
987 * rsb and set master to be the remote node.
989 * recover masters, we are finding the new master for resources
990 * . dlm_recover_masters
992 * . dlm_send_rcom_lookup
993 * . receive_rcom_lookup
994 * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
997 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
998 unsigned int flags, int *r_nodeid, int *result)
1000 struct dlm_rsb *r = NULL;
1002 int our_nodeid = dlm_our_nodeid();
1003 int dir_nodeid, error;
1005 if (len > DLM_RESNAME_MAXLEN)
1008 if (from_nodeid == our_nodeid) {
1009 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1014 hash = jhash(name, len, 0);
1015 b = hash & (ls->ls_rsbtbl_size - 1);
1017 dir_nodeid = dlm_hash2nodeid(ls, hash);
1018 if (dir_nodeid != our_nodeid) {
1019 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1020 from_nodeid, dir_nodeid, our_nodeid, hash,
1027 error = pre_rsb_struct(ls);
1031 spin_lock(&ls->ls_rsbtbl[b].lock);
1032 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1034 /* because the rsb is active, we need to lock_rsb before
1035 * checking/changing re_master_nodeid
1039 spin_unlock(&ls->ls_rsbtbl[b].lock);
1042 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1043 flags, r_nodeid, result);
1045 /* the rsb was active */
1052 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1056 /* because the rsb is inactive (on toss list), it's not refcounted
1057 * and lock_rsb is not used, but is protected by the rsbtbl lock
1060 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1063 r->res_toss_time = jiffies;
1064 /* the rsb was inactive (on toss list) */
1065 spin_unlock(&ls->ls_rsbtbl[b].lock);
1070 error = get_rsb_struct(ls, name, len, &r);
1071 if (error == -EAGAIN) {
1072 spin_unlock(&ls->ls_rsbtbl[b].lock);
1080 r->res_dir_nodeid = our_nodeid;
1081 r->res_master_nodeid = from_nodeid;
1082 r->res_nodeid = from_nodeid;
1083 kref_init(&r->res_ref);
1084 r->res_toss_time = jiffies;
1086 error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1088 /* should never happen */
1090 spin_unlock(&ls->ls_rsbtbl[b].lock);
1095 *result = DLM_LU_ADD;
1096 *r_nodeid = from_nodeid;
1098 spin_unlock(&ls->ls_rsbtbl[b].lock);
1102 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1108 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1109 spin_lock(&ls->ls_rsbtbl[i].lock);
1110 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1111 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1112 if (r->res_hash == hash)
1115 spin_unlock(&ls->ls_rsbtbl[i].lock);
1119 void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
1121 struct dlm_rsb *r = NULL;
1125 hash = jhash(name, len, 0);
1126 b = hash & (ls->ls_rsbtbl_size - 1);
1128 spin_lock(&ls->ls_rsbtbl[b].lock);
1129 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1133 error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1139 spin_unlock(&ls->ls_rsbtbl[b].lock);
1142 static void toss_rsb(struct kref *kref)
1144 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1145 struct dlm_ls *ls = r->res_ls;
1147 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1148 kref_init(&r->res_ref);
1149 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1150 rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1151 r->res_toss_time = jiffies;
1152 ls->ls_rsbtbl[r->res_bucket].flags |= DLM_RTF_SHRINK;
1153 if (r->res_lvbptr) {
1154 dlm_free_lvb(r->res_lvbptr);
1155 r->res_lvbptr = NULL;
1159 /* See comment for unhold_lkb */
1161 static void unhold_rsb(struct dlm_rsb *r)
1164 rv = kref_put(&r->res_ref, toss_rsb);
1165 DLM_ASSERT(!rv, dlm_dump_rsb(r););
1168 static void kill_rsb(struct kref *kref)
1170 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1172 /* All work is done after the return from kref_put() so we
1173 can release the write_lock before the remove and free. */
1175 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1176 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1177 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1178 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1179 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1180 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1183 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1184 The rsb must exist as long as any lkb's for it do. */
1186 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1189 lkb->lkb_resource = r;
1192 static void detach_lkb(struct dlm_lkb *lkb)
1194 if (lkb->lkb_resource) {
1195 put_rsb(lkb->lkb_resource);
1196 lkb->lkb_resource = NULL;
1200 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1203 struct dlm_lkb *lkb;
1206 lkb = dlm_allocate_lkb(ls);
1210 lkb->lkb_nodeid = -1;
1211 lkb->lkb_grmode = DLM_LOCK_IV;
1212 kref_init(&lkb->lkb_ref);
1213 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1214 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1215 #ifdef CONFIG_DLM_DEPRECATED_API
1216 INIT_LIST_HEAD(&lkb->lkb_time_list);
1218 INIT_LIST_HEAD(&lkb->lkb_cb_list);
1219 mutex_init(&lkb->lkb_cb_mutex);
1220 INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1222 idr_preload(GFP_NOFS);
1223 spin_lock(&ls->ls_lkbidr_spin);
1224 rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
1227 spin_unlock(&ls->ls_lkbidr_spin);
1231 log_error(ls, "create_lkb idr error %d", rv);
1240 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1242 return _create_lkb(ls, lkb_ret, 1, 0);
1245 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1247 struct dlm_lkb *lkb;
1249 spin_lock(&ls->ls_lkbidr_spin);
1250 lkb = idr_find(&ls->ls_lkbidr, lkid);
1252 kref_get(&lkb->lkb_ref);
1253 spin_unlock(&ls->ls_lkbidr_spin);
1256 return lkb ? 0 : -ENOENT;
1259 static void kill_lkb(struct kref *kref)
1261 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1263 /* All work is done after the return from kref_put() so we
1264 can release the write_lock before the detach_lkb */
1266 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1269 /* __put_lkb() is used when an lkb may not have an rsb attached to
1270 it so we need to provide the lockspace explicitly */
1272 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1274 uint32_t lkid = lkb->lkb_id;
1277 rv = kref_put_lock(&lkb->lkb_ref, kill_lkb,
1278 &ls->ls_lkbidr_spin);
1280 idr_remove(&ls->ls_lkbidr, lkid);
1281 spin_unlock(&ls->ls_lkbidr_spin);
1285 /* for local/process lkbs, lvbptr points to caller's lksb */
1286 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1287 dlm_free_lvb(lkb->lkb_lvbptr);
1294 int dlm_put_lkb(struct dlm_lkb *lkb)
1298 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1299 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1301 ls = lkb->lkb_resource->res_ls;
1302 return __put_lkb(ls, lkb);
1305 /* This is only called to add a reference when the code already holds
1306 a valid reference to the lkb, so there's no need for locking. */
1308 static inline void hold_lkb(struct dlm_lkb *lkb)
1310 kref_get(&lkb->lkb_ref);
1313 static void unhold_lkb_assert(struct kref *kref)
1315 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1317 DLM_ASSERT(false, dlm_print_lkb(lkb););
1320 /* This is called when we need to remove a reference and are certain
1321 it's not the last ref. e.g. del_lkb is always called between a
1322 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1323 put_lkb would work fine, but would involve unnecessary locking */
1325 static inline void unhold_lkb(struct dlm_lkb *lkb)
1327 kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1330 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1333 struct dlm_lkb *lkb = NULL, *iter;
1335 list_for_each_entry(iter, head, lkb_statequeue)
1336 if (iter->lkb_rqmode < mode) {
1338 list_add_tail(new, &iter->lkb_statequeue);
1343 list_add_tail(new, head);
1346 /* add/remove lkb to rsb's grant/convert/wait queue */
1348 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1350 kref_get(&lkb->lkb_ref);
1352 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1354 lkb->lkb_timestamp = ktime_get();
1356 lkb->lkb_status = status;
1359 case DLM_LKSTS_WAITING:
1360 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1361 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1363 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1365 case DLM_LKSTS_GRANTED:
1366 /* convention says granted locks kept in order of grmode */
1367 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1370 case DLM_LKSTS_CONVERT:
1371 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1372 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1374 list_add_tail(&lkb->lkb_statequeue,
1375 &r->res_convertqueue);
1378 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1382 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1384 lkb->lkb_status = 0;
1385 list_del(&lkb->lkb_statequeue);
1389 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1393 add_lkb(r, lkb, sts);
1397 static int msg_reply_type(int mstype)
1400 case DLM_MSG_REQUEST:
1401 return DLM_MSG_REQUEST_REPLY;
1402 case DLM_MSG_CONVERT:
1403 return DLM_MSG_CONVERT_REPLY;
1404 case DLM_MSG_UNLOCK:
1405 return DLM_MSG_UNLOCK_REPLY;
1406 case DLM_MSG_CANCEL:
1407 return DLM_MSG_CANCEL_REPLY;
1408 case DLM_MSG_LOOKUP:
1409 return DLM_MSG_LOOKUP_REPLY;
1414 /* add/remove lkb from global waiters list of lkb's waiting for
1415 a reply from a remote node */
1417 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1419 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1422 mutex_lock(&ls->ls_waiters_mutex);
1424 if (is_overlap_unlock(lkb) ||
1425 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1430 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1432 case DLM_MSG_UNLOCK:
1433 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1435 case DLM_MSG_CANCEL:
1436 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1442 lkb->lkb_wait_count++;
1445 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1446 lkb->lkb_id, lkb->lkb_wait_type, mstype,
1447 lkb->lkb_wait_count, lkb->lkb_flags);
1451 DLM_ASSERT(!lkb->lkb_wait_count,
1453 printk("wait_count %d\n", lkb->lkb_wait_count););
1455 lkb->lkb_wait_count++;
1456 lkb->lkb_wait_type = mstype;
1457 lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1459 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1462 log_error(ls, "addwait error %x %d flags %x %d %d %s",
1463 lkb->lkb_id, error, lkb->lkb_flags, mstype,
1464 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1465 mutex_unlock(&ls->ls_waiters_mutex);
1469 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1470 list as part of process_requestqueue (e.g. a lookup that has an optimized
1471 request reply on the requestqueue) between dlm_recover_waiters_pre() which
1472 set RESEND and dlm_recover_waiters_post() */
1474 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1475 struct dlm_message *ms)
1477 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1478 int overlap_done = 0;
1480 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1481 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1482 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1487 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1488 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1489 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1494 /* Cancel state was preemptively cleared by a successful convert,
1495 see next comment, nothing to do. */
1497 if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1498 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1499 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1500 lkb->lkb_id, lkb->lkb_wait_type);
1504 /* Remove for the convert reply, and premptively remove for the
1505 cancel reply. A convert has been granted while there's still
1506 an outstanding cancel on it (the cancel is moot and the result
1507 in the cancel reply should be 0). We preempt the cancel reply
1508 because the app gets the convert result and then can follow up
1509 with another op, like convert. This subsequent op would see the
1510 lingering state of the cancel and fail with -EBUSY. */
1512 if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1513 (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1514 is_overlap_cancel(lkb) && ms && !ms->m_result) {
1515 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1517 lkb->lkb_wait_type = 0;
1518 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1519 lkb->lkb_wait_count--;
1524 /* N.B. type of reply may not always correspond to type of original
1525 msg due to lookup->request optimization, verify others? */
1527 if (lkb->lkb_wait_type) {
1528 lkb->lkb_wait_type = 0;
1532 log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1533 lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1534 lkb->lkb_remid, mstype, lkb->lkb_flags);
1538 /* the force-unlock/cancel has completed and we haven't recvd a reply
1539 to the op that was in progress prior to the unlock/cancel; we
1540 give up on any reply to the earlier op. FIXME: not sure when/how
1541 this would happen */
1543 if (overlap_done && lkb->lkb_wait_type) {
1544 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1545 lkb->lkb_id, mstype, lkb->lkb_wait_type);
1546 lkb->lkb_wait_count--;
1548 lkb->lkb_wait_type = 0;
1551 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1553 lkb->lkb_flags &= ~DLM_IFL_RESEND;
1554 lkb->lkb_wait_count--;
1555 if (!lkb->lkb_wait_count)
1556 list_del_init(&lkb->lkb_wait_reply);
1561 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1563 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1566 mutex_lock(&ls->ls_waiters_mutex);
1567 error = _remove_from_waiters(lkb, mstype, NULL);
1568 mutex_unlock(&ls->ls_waiters_mutex);
1572 /* Handles situations where we might be processing a "fake" or "stub" reply in
1573 which we can't try to take waiters_mutex again. */
1575 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1577 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1580 if (ms->m_flags != cpu_to_le32(DLM_IFL_STUB_MS))
1581 mutex_lock(&ls->ls_waiters_mutex);
1582 error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1583 if (ms->m_flags != cpu_to_le32(DLM_IFL_STUB_MS))
1584 mutex_unlock(&ls->ls_waiters_mutex);
1588 /* If there's an rsb for the same resource being removed, ensure
1589 * that the remove message is sent before the new lookup message.
1592 #define DLM_WAIT_PENDING_COND(ls, r) \
1593 (ls->ls_remove_len && \
1594 !rsb_cmp(r, ls->ls_remove_name, \
1597 static void wait_pending_remove(struct dlm_rsb *r)
1599 struct dlm_ls *ls = r->res_ls;
1601 spin_lock(&ls->ls_remove_spin);
1602 if (DLM_WAIT_PENDING_COND(ls, r)) {
1603 log_debug(ls, "delay lookup for remove dir %d %s",
1604 r->res_dir_nodeid, r->res_name);
1605 spin_unlock(&ls->ls_remove_spin);
1606 wait_event(ls->ls_remove_wait, !DLM_WAIT_PENDING_COND(ls, r));
1609 spin_unlock(&ls->ls_remove_spin);
1613 * ls_remove_spin protects ls_remove_name and ls_remove_len which are
1614 * read by other threads in wait_pending_remove. ls_remove_names
1615 * and ls_remove_lens are only used by the scan thread, so they do
1616 * not need protection.
1619 static void shrink_bucket(struct dlm_ls *ls, int b)
1621 struct rb_node *n, *next;
1624 int our_nodeid = dlm_our_nodeid();
1625 int remote_count = 0;
1626 int need_shrink = 0;
1629 memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1631 spin_lock(&ls->ls_rsbtbl[b].lock);
1633 if (!(ls->ls_rsbtbl[b].flags & DLM_RTF_SHRINK)) {
1634 spin_unlock(&ls->ls_rsbtbl[b].lock);
1638 for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1640 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1642 /* If we're the directory record for this rsb, and
1643 we're not the master of it, then we need to wait
1644 for the master node to send us a dir remove for
1645 before removing the dir record. */
1647 if (!dlm_no_directory(ls) &&
1648 (r->res_master_nodeid != our_nodeid) &&
1649 (dlm_dir_nodeid(r) == our_nodeid)) {
1655 if (!time_after_eq(jiffies, r->res_toss_time +
1656 dlm_config.ci_toss_secs * HZ)) {
1660 if (!dlm_no_directory(ls) &&
1661 (r->res_master_nodeid == our_nodeid) &&
1662 (dlm_dir_nodeid(r) != our_nodeid)) {
1664 /* We're the master of this rsb but we're not
1665 the directory record, so we need to tell the
1666 dir node to remove the dir record. */
1668 ls->ls_remove_lens[remote_count] = r->res_length;
1669 memcpy(ls->ls_remove_names[remote_count], r->res_name,
1670 DLM_RESNAME_MAXLEN);
1673 if (remote_count >= DLM_REMOVE_NAMES_MAX)
1678 if (!kref_put(&r->res_ref, kill_rsb)) {
1679 log_error(ls, "tossed rsb in use %s", r->res_name);
1683 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1688 ls->ls_rsbtbl[b].flags |= DLM_RTF_SHRINK;
1690 ls->ls_rsbtbl[b].flags &= ~DLM_RTF_SHRINK;
1691 spin_unlock(&ls->ls_rsbtbl[b].lock);
1694 * While searching for rsb's to free, we found some that require
1695 * remote removal. We leave them in place and find them again here
1696 * so there is a very small gap between removing them from the toss
1697 * list and sending the removal. Keeping this gap small is
1698 * important to keep us (the master node) from being out of sync
1699 * with the remote dir node for very long.
1701 * From the time the rsb is removed from toss until just after
1702 * send_remove, the rsb name is saved in ls_remove_name. A new
1703 * lookup checks this to ensure that a new lookup message for the
1704 * same resource name is not sent just before the remove message.
1707 for (i = 0; i < remote_count; i++) {
1708 name = ls->ls_remove_names[i];
1709 len = ls->ls_remove_lens[i];
1711 spin_lock(&ls->ls_rsbtbl[b].lock);
1712 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1714 spin_unlock(&ls->ls_rsbtbl[b].lock);
1715 log_debug(ls, "remove_name not toss %s", name);
1719 if (r->res_master_nodeid != our_nodeid) {
1720 spin_unlock(&ls->ls_rsbtbl[b].lock);
1721 log_debug(ls, "remove_name master %d dir %d our %d %s",
1722 r->res_master_nodeid, r->res_dir_nodeid,
1727 if (r->res_dir_nodeid == our_nodeid) {
1728 /* should never happen */
1729 spin_unlock(&ls->ls_rsbtbl[b].lock);
1730 log_error(ls, "remove_name dir %d master %d our %d %s",
1731 r->res_dir_nodeid, r->res_master_nodeid,
1736 if (!time_after_eq(jiffies, r->res_toss_time +
1737 dlm_config.ci_toss_secs * HZ)) {
1738 spin_unlock(&ls->ls_rsbtbl[b].lock);
1739 log_debug(ls, "remove_name toss_time %lu now %lu %s",
1740 r->res_toss_time, jiffies, name);
1744 if (!kref_put(&r->res_ref, kill_rsb)) {
1745 spin_unlock(&ls->ls_rsbtbl[b].lock);
1746 log_error(ls, "remove_name in use %s", name);
1750 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1752 /* block lookup of same name until we've sent remove */
1753 spin_lock(&ls->ls_remove_spin);
1754 ls->ls_remove_len = len;
1755 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
1756 spin_unlock(&ls->ls_remove_spin);
1757 spin_unlock(&ls->ls_rsbtbl[b].lock);
1761 /* allow lookup of name again */
1762 spin_lock(&ls->ls_remove_spin);
1763 ls->ls_remove_len = 0;
1764 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
1765 spin_unlock(&ls->ls_remove_spin);
1766 wake_up(&ls->ls_remove_wait);
1772 void dlm_scan_rsbs(struct dlm_ls *ls)
1776 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1777 shrink_bucket(ls, i);
1778 if (dlm_locking_stopped(ls))
1784 #ifdef CONFIG_DLM_DEPRECATED_API
1785 static void add_timeout(struct dlm_lkb *lkb)
1787 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1789 if (is_master_copy(lkb))
1792 if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1793 !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1794 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1797 if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1802 DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1803 mutex_lock(&ls->ls_timeout_mutex);
1805 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1806 mutex_unlock(&ls->ls_timeout_mutex);
1809 static void del_timeout(struct dlm_lkb *lkb)
1811 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1813 mutex_lock(&ls->ls_timeout_mutex);
1814 if (!list_empty(&lkb->lkb_time_list)) {
1815 list_del_init(&lkb->lkb_time_list);
1818 mutex_unlock(&ls->ls_timeout_mutex);
1821 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1822 lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex
1823 and then lock rsb because of lock ordering in add_timeout. We may need
1824 to specify some special timeout-related bits in the lkb that are just to
1825 be accessed under the timeout_mutex. */
1827 void dlm_scan_timeout(struct dlm_ls *ls)
1830 struct dlm_lkb *lkb = NULL, *iter;
1831 int do_cancel, do_warn;
1835 if (dlm_locking_stopped(ls))
1840 mutex_lock(&ls->ls_timeout_mutex);
1841 list_for_each_entry(iter, &ls->ls_timeout, lkb_time_list) {
1843 wait_us = ktime_to_us(ktime_sub(ktime_get(),
1844 iter->lkb_timestamp));
1846 if ((iter->lkb_exflags & DLM_LKF_TIMEOUT) &&
1847 wait_us >= (iter->lkb_timeout_cs * 10000))
1850 if ((iter->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1851 wait_us >= dlm_config.ci_timewarn_cs * 10000)
1854 if (!do_cancel && !do_warn)
1860 mutex_unlock(&ls->ls_timeout_mutex);
1865 r = lkb->lkb_resource;
1870 /* clear flag so we only warn once */
1871 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1872 if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1874 dlm_timeout_warn(lkb);
1878 log_debug(ls, "timeout cancel %x node %d %s",
1879 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1880 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1881 lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1883 _cancel_lock(r, lkb);
1892 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1893 dlm_recoverd before checking/setting ls_recover_begin. */
1895 void dlm_adjust_timeouts(struct dlm_ls *ls)
1897 struct dlm_lkb *lkb;
1898 u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1900 ls->ls_recover_begin = 0;
1901 mutex_lock(&ls->ls_timeout_mutex);
1902 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1903 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1904 mutex_unlock(&ls->ls_timeout_mutex);
1907 static void add_timeout(struct dlm_lkb *lkb) { }
1908 static void del_timeout(struct dlm_lkb *lkb) { }
1911 /* lkb is master or local copy */
1913 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1915 int b, len = r->res_ls->ls_lvblen;
1917 /* b=1 lvb returned to caller
1918 b=0 lvb written to rsb or invalidated
1921 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1924 if (!lkb->lkb_lvbptr)
1927 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1933 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1934 lkb->lkb_lvbseq = r->res_lvbseq;
1936 } else if (b == 0) {
1937 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1938 rsb_set_flag(r, RSB_VALNOTVALID);
1942 if (!lkb->lkb_lvbptr)
1945 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1949 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1954 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1956 lkb->lkb_lvbseq = r->res_lvbseq;
1957 rsb_clear_flag(r, RSB_VALNOTVALID);
1960 if (rsb_flag(r, RSB_VALNOTVALID))
1961 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1964 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1966 if (lkb->lkb_grmode < DLM_LOCK_PW)
1969 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1970 rsb_set_flag(r, RSB_VALNOTVALID);
1974 if (!lkb->lkb_lvbptr)
1977 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1981 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1986 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1988 rsb_clear_flag(r, RSB_VALNOTVALID);
1991 /* lkb is process copy (pc) */
1993 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1994 struct dlm_message *ms)
1998 if (!lkb->lkb_lvbptr)
2001 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2004 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
2006 int len = receive_extralen(ms);
2007 if (len > r->res_ls->ls_lvblen)
2008 len = r->res_ls->ls_lvblen;
2009 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2010 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
2014 /* Manipulate lkb's on rsb's convert/granted/waiting queues
2015 remove_lock -- used for unlock, removes lkb from granted
2016 revert_lock -- used for cancel, moves lkb from convert to granted
2017 grant_lock -- used for request and convert, adds lkb to granted or
2018 moves lkb from convert or waiting to granted
2020 Each of these is used for master or local copy lkb's. There is
2021 also a _pc() variation used to make the corresponding change on
2022 a process copy (pc) lkb. */
2024 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2027 lkb->lkb_grmode = DLM_LOCK_IV;
2028 /* this unhold undoes the original ref from create_lkb()
2029 so this leads to the lkb being freed */
2033 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2035 set_lvb_unlock(r, lkb);
2036 _remove_lock(r, lkb);
2039 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2041 _remove_lock(r, lkb);
2044 /* returns: 0 did nothing
2045 1 moved lock to granted
2048 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2052 lkb->lkb_rqmode = DLM_LOCK_IV;
2054 switch (lkb->lkb_status) {
2055 case DLM_LKSTS_GRANTED:
2057 case DLM_LKSTS_CONVERT:
2058 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2061 case DLM_LKSTS_WAITING:
2063 lkb->lkb_grmode = DLM_LOCK_IV;
2064 /* this unhold undoes the original ref from create_lkb()
2065 so this leads to the lkb being freed */
2070 log_print("invalid status for revert %d", lkb->lkb_status);
2075 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2077 return revert_lock(r, lkb);
2080 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2082 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2083 lkb->lkb_grmode = lkb->lkb_rqmode;
2084 if (lkb->lkb_status)
2085 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2087 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2090 lkb->lkb_rqmode = DLM_LOCK_IV;
2091 lkb->lkb_highbast = 0;
2094 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2096 set_lvb_lock(r, lkb);
2097 _grant_lock(r, lkb);
2100 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2101 struct dlm_message *ms)
2103 set_lvb_lock_pc(r, lkb, ms);
2104 _grant_lock(r, lkb);
2107 /* called by grant_pending_locks() which means an async grant message must
2108 be sent to the requesting node in addition to granting the lock if the
2109 lkb belongs to a remote node. */
2111 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2114 if (is_master_copy(lkb))
2117 queue_cast(r, lkb, 0);
2120 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2121 change the granted/requested modes. We're munging things accordingly in
2123 CONVDEADLK: our grmode may have been forced down to NL to resolve a
2125 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2126 compatible with other granted locks */
2128 static void munge_demoted(struct dlm_lkb *lkb)
2130 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2131 log_print("munge_demoted %x invalid modes gr %d rq %d",
2132 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2136 lkb->lkb_grmode = DLM_LOCK_NL;
2139 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
2141 if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
2142 ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
2143 log_print("munge_altmode %x invalid reply type %d",
2144 lkb->lkb_id, le32_to_cpu(ms->m_type));
2148 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2149 lkb->lkb_rqmode = DLM_LOCK_PR;
2150 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2151 lkb->lkb_rqmode = DLM_LOCK_CW;
2153 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2158 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2160 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2162 if (lkb->lkb_id == first->lkb_id)
2168 /* Check if the given lkb conflicts with another lkb on the queue. */
2170 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2172 struct dlm_lkb *this;
2174 list_for_each_entry(this, head, lkb_statequeue) {
2177 if (!modes_compat(this, lkb))
2184 * "A conversion deadlock arises with a pair of lock requests in the converting
2185 * queue for one resource. The granted mode of each lock blocks the requested
2186 * mode of the other lock."
2188 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2189 * convert queue from being granted, then deadlk/demote lkb.
2192 * Granted Queue: empty
2193 * Convert Queue: NL->EX (first lock)
2194 * PR->EX (second lock)
2196 * The first lock can't be granted because of the granted mode of the second
2197 * lock and the second lock can't be granted because it's not first in the
2198 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2199 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2200 * flag set and return DEMOTED in the lksb flags.
2202 * Originally, this function detected conv-deadlk in a more limited scope:
2203 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2204 * - if lkb1 was the first entry in the queue (not just earlier), and was
2205 * blocked by the granted mode of lkb2, and there was nothing on the
2206 * granted queue preventing lkb1 from being granted immediately, i.e.
2207 * lkb2 was the only thing preventing lkb1 from being granted.
2209 * That second condition meant we'd only say there was conv-deadlk if
2210 * resolving it (by demotion) would lead to the first lock on the convert
2211 * queue being granted right away. It allowed conversion deadlocks to exist
2212 * between locks on the convert queue while they couldn't be granted anyway.
2214 * Now, we detect and take action on conversion deadlocks immediately when
2215 * they're created, even if they may not be immediately consequential. If
2216 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2217 * mode that would prevent lkb1's conversion from being granted, we do a
2218 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2219 * I think this means that the lkb_is_ahead condition below should always
2220 * be zero, i.e. there will never be conv-deadlk between two locks that are
2221 * both already on the convert queue.
2224 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2226 struct dlm_lkb *lkb1;
2227 int lkb_is_ahead = 0;
2229 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2235 if (!lkb_is_ahead) {
2236 if (!modes_compat(lkb2, lkb1))
2239 if (!modes_compat(lkb2, lkb1) &&
2240 !modes_compat(lkb1, lkb2))
2248 * Return 1 if the lock can be granted, 0 otherwise.
2249 * Also detect and resolve conversion deadlocks.
2251 * lkb is the lock to be granted
2253 * now is 1 if the function is being called in the context of the
2254 * immediate request, it is 0 if called later, after the lock has been
2257 * recover is 1 if dlm_recover_grant() is trying to grant conversions
2260 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2263 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2266 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2269 * 6-10: Version 5.4 introduced an option to address the phenomenon of
2270 * a new request for a NL mode lock being blocked.
2272 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2273 * request, then it would be granted. In essence, the use of this flag
2274 * tells the Lock Manager to expedite theis request by not considering
2275 * what may be in the CONVERTING or WAITING queues... As of this
2276 * writing, the EXPEDITE flag can be used only with new requests for NL
2277 * mode locks. This flag is not valid for conversion requests.
2279 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
2280 * conversion or used with a non-NL requested mode. We also know an
2281 * EXPEDITE request is always granted immediately, so now must always
2282 * be 1. The full condition to grant an expedite request: (now &&
2283 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2284 * therefore be shortened to just checking the flag.
2287 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2291 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2292 * added to the remaining conditions.
2295 if (queue_conflict(&r->res_grantqueue, lkb))
2299 * 6-3: By default, a conversion request is immediately granted if the
2300 * requested mode is compatible with the modes of all other granted
2304 if (queue_conflict(&r->res_convertqueue, lkb))
2308 * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2309 * locks for a recovered rsb, on which lkb's have been rebuilt.
2310 * The lkb's may have been rebuilt on the queues in a different
2311 * order than they were in on the previous master. So, granting
2312 * queued conversions in order after recovery doesn't make sense
2313 * since the order hasn't been preserved anyway. The new order
2314 * could also have created a new "in place" conversion deadlock.
2315 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2316 * After recovery, there would be no granted locks, and possibly
2317 * NL->EX, PR->EX, an in-place conversion deadlock.) So, after
2318 * recovery, grant conversions without considering order.
2321 if (conv && recover)
2325 * 6-5: But the default algorithm for deciding whether to grant or
2326 * queue conversion requests does not by itself guarantee that such
2327 * requests are serviced on a "first come first serve" basis. This, in
2328 * turn, can lead to a phenomenon known as "indefinate postponement".
2330 * 6-7: This issue is dealt with by using the optional QUECVT flag with
2331 * the system service employed to request a lock conversion. This flag
2332 * forces certain conversion requests to be queued, even if they are
2333 * compatible with the granted modes of other locks on the same
2334 * resource. Thus, the use of this flag results in conversion requests
2335 * being ordered on a "first come first servce" basis.
2337 * DCT: This condition is all about new conversions being able to occur
2338 * "in place" while the lock remains on the granted queue (assuming
2339 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
2340 * doesn't _have_ to go onto the convert queue where it's processed in
2341 * order. The "now" variable is necessary to distinguish converts
2342 * being received and processed for the first time now, because once a
2343 * convert is moved to the conversion queue the condition below applies
2344 * requiring fifo granting.
2347 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2351 * Even if the convert is compat with all granted locks,
2352 * QUECVT forces it behind other locks on the convert queue.
2355 if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2356 if (list_empty(&r->res_convertqueue))
2363 * The NOORDER flag is set to avoid the standard vms rules on grant
2367 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2371 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2372 * granted until all other conversion requests ahead of it are granted
2376 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2380 * 6-4: By default, a new request is immediately granted only if all
2381 * three of the following conditions are satisfied when the request is
2383 * - The queue of ungranted conversion requests for the resource is
2385 * - The queue of ungranted new requests for the resource is empty.
2386 * - The mode of the new request is compatible with the most
2387 * restrictive mode of all granted locks on the resource.
2390 if (now && !conv && list_empty(&r->res_convertqueue) &&
2391 list_empty(&r->res_waitqueue))
2395 * 6-4: Once a lock request is in the queue of ungranted new requests,
2396 * it cannot be granted until the queue of ungranted conversion
2397 * requests is empty, all ungranted new requests ahead of it are
2398 * granted and/or canceled, and it is compatible with the granted mode
2399 * of the most restrictive lock granted on the resource.
2402 if (!now && !conv && list_empty(&r->res_convertqueue) &&
2403 first_in_list(lkb, &r->res_waitqueue))
2409 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2410 int recover, int *err)
2413 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2414 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2419 rv = _can_be_granted(r, lkb, now, recover);
2424 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2425 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2426 * cancels one of the locks.
2429 if (is_convert && can_be_queued(lkb) &&
2430 conversion_deadlock_detect(r, lkb)) {
2431 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2432 lkb->lkb_grmode = DLM_LOCK_NL;
2433 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
2437 log_print("can_be_granted deadlock %x now %d",
2445 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2446 * to grant a request in a mode other than the normal rqmode. It's a
2447 * simple way to provide a big optimization to applications that can
2451 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2453 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2457 lkb->lkb_rqmode = alt;
2458 rv = _can_be_granted(r, lkb, now, 0);
2460 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
2462 lkb->lkb_rqmode = rqmode;
2468 /* Returns the highest requested mode of all blocked conversions; sets
2469 cw if there's a blocked conversion to DLM_LOCK_CW. */
2471 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2472 unsigned int *count)
2474 struct dlm_lkb *lkb, *s;
2475 int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2476 int hi, demoted, quit, grant_restart, demote_restart;
2485 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2486 demoted = is_demoted(lkb);
2489 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2490 grant_lock_pending(r, lkb);
2497 if (!demoted && is_demoted(lkb)) {
2498 log_print("WARN: pending demoted %x node %d %s",
2499 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2506 * If DLM_LKB_NODLKWT flag is set and conversion
2507 * deadlock is detected, we request blocking AST and
2508 * down (or cancel) conversion.
2510 if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2511 if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2512 queue_bast(r, lkb, lkb->lkb_rqmode);
2513 lkb->lkb_highbast = lkb->lkb_rqmode;
2516 log_print("WARN: pending deadlock %x node %d %s",
2517 lkb->lkb_id, lkb->lkb_nodeid,
2524 hi = max_t(int, lkb->lkb_rqmode, hi);
2526 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2532 if (demote_restart && !quit) {
2537 return max_t(int, high, hi);
2540 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2541 unsigned int *count)
2543 struct dlm_lkb *lkb, *s;
2545 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2546 if (can_be_granted(r, lkb, 0, 0, NULL)) {
2547 grant_lock_pending(r, lkb);
2551 high = max_t(int, lkb->lkb_rqmode, high);
2552 if (lkb->lkb_rqmode == DLM_LOCK_CW)
2560 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2561 on either the convert or waiting queue.
2562 high is the largest rqmode of all locks blocked on the convert or
2565 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2567 if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2568 if (gr->lkb_highbast < DLM_LOCK_EX)
2573 if (gr->lkb_highbast < high &&
2574 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2579 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2581 struct dlm_lkb *lkb, *s;
2582 int high = DLM_LOCK_IV;
2585 if (!is_master(r)) {
2586 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2591 high = grant_pending_convert(r, high, &cw, count);
2592 high = grant_pending_wait(r, high, &cw, count);
2594 if (high == DLM_LOCK_IV)
2598 * If there are locks left on the wait/convert queue then send blocking
2599 * ASTs to granted locks based on the largest requested mode (high)
2603 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2604 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2605 if (cw && high == DLM_LOCK_PR &&
2606 lkb->lkb_grmode == DLM_LOCK_PR)
2607 queue_bast(r, lkb, DLM_LOCK_CW);
2609 queue_bast(r, lkb, high);
2610 lkb->lkb_highbast = high;
2615 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2617 if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2618 (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2619 if (gr->lkb_highbast < DLM_LOCK_EX)
2624 if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2629 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2630 struct dlm_lkb *lkb)
2634 list_for_each_entry(gr, head, lkb_statequeue) {
2635 /* skip self when sending basts to convertqueue */
2638 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2639 queue_bast(r, gr, lkb->lkb_rqmode);
2640 gr->lkb_highbast = lkb->lkb_rqmode;
2645 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2647 send_bast_queue(r, &r->res_grantqueue, lkb);
2650 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2652 send_bast_queue(r, &r->res_grantqueue, lkb);
2653 send_bast_queue(r, &r->res_convertqueue, lkb);
2656 /* set_master(r, lkb) -- set the master nodeid of a resource
2658 The purpose of this function is to set the nodeid field in the given
2659 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
2660 known, it can just be copied to the lkb and the function will return
2661 0. If the rsb's nodeid is _not_ known, it needs to be looked up
2662 before it can be copied to the lkb.
2664 When the rsb nodeid is being looked up remotely, the initial lkb
2665 causing the lookup is kept on the ls_waiters list waiting for the
2666 lookup reply. Other lkb's waiting for the same rsb lookup are kept
2667 on the rsb's res_lookup list until the master is verified.
2670 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2671 1: the rsb master is not available and the lkb has been placed on
2675 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2677 int our_nodeid = dlm_our_nodeid();
2679 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2680 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2681 r->res_first_lkid = lkb->lkb_id;
2682 lkb->lkb_nodeid = r->res_nodeid;
2686 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2687 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2691 if (r->res_master_nodeid == our_nodeid) {
2692 lkb->lkb_nodeid = 0;
2696 if (r->res_master_nodeid) {
2697 lkb->lkb_nodeid = r->res_master_nodeid;
2701 if (dlm_dir_nodeid(r) == our_nodeid) {
2702 /* This is a somewhat unusual case; find_rsb will usually
2703 have set res_master_nodeid when dir nodeid is local, but
2704 there are cases where we become the dir node after we've
2705 past find_rsb and go through _request_lock again.
2706 confirm_master() or process_lookup_list() needs to be
2707 called after this. */
2708 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2709 lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2711 r->res_master_nodeid = our_nodeid;
2713 lkb->lkb_nodeid = 0;
2717 wait_pending_remove(r);
2719 r->res_first_lkid = lkb->lkb_id;
2720 send_lookup(r, lkb);
2724 static void process_lookup_list(struct dlm_rsb *r)
2726 struct dlm_lkb *lkb, *safe;
2728 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2729 list_del_init(&lkb->lkb_rsb_lookup);
2730 _request_lock(r, lkb);
2735 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2737 static void confirm_master(struct dlm_rsb *r, int error)
2739 struct dlm_lkb *lkb;
2741 if (!r->res_first_lkid)
2747 r->res_first_lkid = 0;
2748 process_lookup_list(r);
2754 /* the remote request failed and won't be retried (it was
2755 a NOQUEUE, or has been canceled/unlocked); make a waiting
2756 lkb the first_lkid */
2758 r->res_first_lkid = 0;
2760 if (!list_empty(&r->res_lookup)) {
2761 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2763 list_del_init(&lkb->lkb_rsb_lookup);
2764 r->res_first_lkid = lkb->lkb_id;
2765 _request_lock(r, lkb);
2770 log_error(r->res_ls, "confirm_master unknown error %d", error);
2774 #ifdef CONFIG_DLM_DEPRECATED_API
2775 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2776 int namelen, unsigned long timeout_cs,
2777 void (*ast) (void *astparam),
2779 void (*bast) (void *astparam, int mode),
2780 struct dlm_args *args)
2782 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2783 int namelen, void (*ast)(void *astparam),
2785 void (*bast)(void *astparam, int mode),
2786 struct dlm_args *args)
2791 /* check for invalid arg usage */
2793 if (mode < 0 || mode > DLM_LOCK_EX)
2796 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2799 if (flags & DLM_LKF_CANCEL)
2802 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2805 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2808 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2811 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2814 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2817 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2820 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2826 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2829 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2832 /* these args will be copied to the lkb in validate_lock_args,
2833 it cannot be done now because when converting locks, fields in
2834 an active lkb cannot be modified before locking the rsb */
2836 args->flags = flags;
2838 args->astparam = astparam;
2839 args->bastfn = bast;
2840 #ifdef CONFIG_DLM_DEPRECATED_API
2841 args->timeout = timeout_cs;
2850 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2852 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2853 DLM_LKF_FORCEUNLOCK))
2856 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2859 args->flags = flags;
2860 args->astparam = astarg;
2864 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2865 struct dlm_args *args)
2869 if (args->flags & DLM_LKF_CONVERT) {
2870 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2873 if (args->flags & DLM_LKF_QUECVT &&
2874 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2878 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2881 /* lock not allowed if there's any op in progress */
2882 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2885 if (is_overlap(lkb))
2889 lkb->lkb_exflags = args->flags;
2890 lkb->lkb_sbflags = 0;
2891 lkb->lkb_astfn = args->astfn;
2892 lkb->lkb_astparam = args->astparam;
2893 lkb->lkb_bastfn = args->bastfn;
2894 lkb->lkb_rqmode = args->mode;
2895 lkb->lkb_lksb = args->lksb;
2896 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2897 lkb->lkb_ownpid = (int) current->pid;
2898 #ifdef CONFIG_DLM_DEPRECATED_API
2899 lkb->lkb_timeout_cs = args->timeout;
2904 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2905 rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2906 lkb->lkb_status, lkb->lkb_wait_type,
2907 lkb->lkb_resource->res_name);
2911 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2914 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2915 because there may be a lookup in progress and it's valid to do
2916 cancel/unlockf on it */
2918 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2920 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2923 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2924 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2929 /* an lkb may still exist even though the lock is EOL'ed due to a
2930 cancel, unlock or failed noqueue request; an app can't use these
2931 locks; return same error as if the lkid had not been found at all */
2933 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2934 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2939 /* an lkb may be waiting for an rsb lookup to complete where the
2940 lookup was initiated by another lock */
2942 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2943 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2944 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2945 list_del_init(&lkb->lkb_rsb_lookup);
2946 queue_cast(lkb->lkb_resource, lkb,
2947 args->flags & DLM_LKF_CANCEL ?
2948 -DLM_ECANCEL : -DLM_EUNLOCK);
2949 unhold_lkb(lkb); /* undoes create_lkb() */
2951 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2956 /* cancel not allowed with another cancel/unlock in progress */
2958 if (args->flags & DLM_LKF_CANCEL) {
2959 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2962 if (is_overlap(lkb))
2965 /* don't let scand try to do a cancel */
2968 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2969 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2974 /* there's nothing to cancel */
2975 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2976 !lkb->lkb_wait_type) {
2981 switch (lkb->lkb_wait_type) {
2982 case DLM_MSG_LOOKUP:
2983 case DLM_MSG_REQUEST:
2984 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2987 case DLM_MSG_UNLOCK:
2988 case DLM_MSG_CANCEL:
2991 /* add_to_waiters() will set OVERLAP_CANCEL */
2995 /* do we need to allow a force-unlock if there's a normal unlock
2996 already in progress? in what conditions could the normal unlock
2997 fail such that we'd want to send a force-unlock to be sure? */
2999 if (args->flags & DLM_LKF_FORCEUNLOCK) {
3000 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
3003 if (is_overlap_unlock(lkb))
3006 /* don't let scand try to do a cancel */
3009 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3010 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3015 switch (lkb->lkb_wait_type) {
3016 case DLM_MSG_LOOKUP:
3017 case DLM_MSG_REQUEST:
3018 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3021 case DLM_MSG_UNLOCK:
3024 /* add_to_waiters() will set OVERLAP_UNLOCK */
3028 /* normal unlock not allowed if there's any op in progress */
3030 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
3034 /* an overlapping op shouldn't blow away exflags from other op */
3035 lkb->lkb_exflags |= args->flags;
3036 lkb->lkb_sbflags = 0;
3037 lkb->lkb_astparam = args->astparam;
3041 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
3042 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
3043 args->flags, lkb->lkb_wait_type,
3044 lkb->lkb_resource->res_name);
3049 * Four stage 4 varieties:
3050 * do_request(), do_convert(), do_unlock(), do_cancel()
3051 * These are called on the master node for the given lock and
3052 * from the central locking logic.
3055 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3059 if (can_be_granted(r, lkb, 1, 0, NULL)) {
3061 queue_cast(r, lkb, 0);
3065 if (can_be_queued(lkb)) {
3066 error = -EINPROGRESS;
3067 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3073 queue_cast(r, lkb, -EAGAIN);
3078 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3083 if (force_blocking_asts(lkb))
3084 send_blocking_asts_all(r, lkb);
3087 send_blocking_asts(r, lkb);
3092 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3097 /* changing an existing lock may allow others to be granted */
3099 if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3101 queue_cast(r, lkb, 0);
3105 /* can_be_granted() detected that this lock would block in a conversion
3106 deadlock, so we leave it on the granted queue and return EDEADLK in
3107 the ast for the convert. */
3109 if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3110 /* it's left on the granted queue */
3111 revert_lock(r, lkb);
3112 queue_cast(r, lkb, -EDEADLK);
3117 /* is_demoted() means the can_be_granted() above set the grmode
3118 to NL, and left us on the granted queue. This auto-demotion
3119 (due to CONVDEADLK) might mean other locks, and/or this lock, are
3120 now grantable. We have to try to grant other converting locks
3121 before we try again to grant this one. */
3123 if (is_demoted(lkb)) {
3124 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3125 if (_can_be_granted(r, lkb, 1, 0)) {
3127 queue_cast(r, lkb, 0);
3130 /* else fall through and move to convert queue */
3133 if (can_be_queued(lkb)) {
3134 error = -EINPROGRESS;
3136 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3142 queue_cast(r, lkb, -EAGAIN);
3147 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3152 grant_pending_locks(r, NULL);
3153 /* grant_pending_locks also sends basts */
3156 if (force_blocking_asts(lkb))
3157 send_blocking_asts_all(r, lkb);
3160 send_blocking_asts(r, lkb);
3165 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3167 remove_lock(r, lkb);
3168 queue_cast(r, lkb, -DLM_EUNLOCK);
3169 return -DLM_EUNLOCK;
3172 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3175 grant_pending_locks(r, NULL);
3178 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3180 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3184 error = revert_lock(r, lkb);
3186 queue_cast(r, lkb, -DLM_ECANCEL);
3187 return -DLM_ECANCEL;
3192 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3196 grant_pending_locks(r, NULL);
3200 * Four stage 3 varieties:
3201 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3204 /* add a new lkb to a possibly new rsb, called by requesting process */
3206 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3210 /* set_master: sets lkb nodeid from r */
3212 error = set_master(r, lkb);
3221 /* receive_request() calls do_request() on remote node */
3222 error = send_request(r, lkb);
3224 error = do_request(r, lkb);
3225 /* for remote locks the request_reply is sent
3226 between do_request and do_request_effects */
3227 do_request_effects(r, lkb, error);
3233 /* change some property of an existing lkb, e.g. mode */
3235 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3240 /* receive_convert() calls do_convert() on remote node */
3241 error = send_convert(r, lkb);
3243 error = do_convert(r, lkb);
3244 /* for remote locks the convert_reply is sent
3245 between do_convert and do_convert_effects */
3246 do_convert_effects(r, lkb, error);
3252 /* remove an existing lkb from the granted queue */
3254 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3259 /* receive_unlock() calls do_unlock() on remote node */
3260 error = send_unlock(r, lkb);
3262 error = do_unlock(r, lkb);
3263 /* for remote locks the unlock_reply is sent
3264 between do_unlock and do_unlock_effects */
3265 do_unlock_effects(r, lkb, error);
3271 /* remove an existing lkb from the convert or wait queue */
3273 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3278 /* receive_cancel() calls do_cancel() on remote node */
3279 error = send_cancel(r, lkb);
3281 error = do_cancel(r, lkb);
3282 /* for remote locks the cancel_reply is sent
3283 between do_cancel and do_cancel_effects */
3284 do_cancel_effects(r, lkb, error);
3291 * Four stage 2 varieties:
3292 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3295 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
3296 int len, struct dlm_args *args)
3301 error = validate_lock_args(ls, lkb, args);
3305 error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3312 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3314 error = _request_lock(r, lkb);
3321 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3322 struct dlm_args *args)
3327 r = lkb->lkb_resource;
3332 error = validate_lock_args(ls, lkb, args);
3336 error = _convert_lock(r, lkb);
3343 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3344 struct dlm_args *args)
3349 r = lkb->lkb_resource;
3354 error = validate_unlock_args(lkb, args);
3358 error = _unlock_lock(r, lkb);
3365 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3366 struct dlm_args *args)
3371 r = lkb->lkb_resource;
3376 error = validate_unlock_args(lkb, args);
3380 error = _cancel_lock(r, lkb);
3388 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
3391 int dlm_lock(dlm_lockspace_t *lockspace,
3393 struct dlm_lksb *lksb,
3396 unsigned int namelen,
3397 uint32_t parent_lkid,
3398 void (*ast) (void *astarg),
3400 void (*bast) (void *astarg, int mode))
3403 struct dlm_lkb *lkb;
3404 struct dlm_args args;
3405 int error, convert = flags & DLM_LKF_CONVERT;
3407 ls = dlm_find_lockspace_local(lockspace);
3411 dlm_lock_recovery(ls);
3414 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3416 error = create_lkb(ls, &lkb);
3421 trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3423 #ifdef CONFIG_DLM_DEPRECATED_API
3424 error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3425 astarg, bast, &args);
3427 error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3434 error = convert_lock(ls, lkb, &args);
3436 error = request_lock(ls, lkb, name, namelen, &args);
3438 if (error == -EINPROGRESS)
3441 trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error);
3443 if (convert || error)
3445 if (error == -EAGAIN || error == -EDEADLK)
3448 dlm_unlock_recovery(ls);
3449 dlm_put_lockspace(ls);
3453 int dlm_unlock(dlm_lockspace_t *lockspace,
3456 struct dlm_lksb *lksb,
3460 struct dlm_lkb *lkb;
3461 struct dlm_args args;
3464 ls = dlm_find_lockspace_local(lockspace);
3468 dlm_lock_recovery(ls);
3470 error = find_lkb(ls, lkid, &lkb);
3474 trace_dlm_unlock_start(ls, lkb, flags);
3476 error = set_unlock_args(flags, astarg, &args);
3480 if (flags & DLM_LKF_CANCEL)
3481 error = cancel_lock(ls, lkb, &args);
3483 error = unlock_lock(ls, lkb, &args);
3485 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3487 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3490 trace_dlm_unlock_end(ls, lkb, flags, error);
3494 dlm_unlock_recovery(ls);
3495 dlm_put_lockspace(ls);
3500 * send/receive routines for remote operations and replies
3504 * send_request receive_request
3505 * send_convert receive_convert
3506 * send_unlock receive_unlock
3507 * send_cancel receive_cancel
3508 * send_grant receive_grant
3509 * send_bast receive_bast
3510 * send_lookup receive_lookup
3511 * send_remove receive_remove
3514 * receive_request_reply send_request_reply
3515 * receive_convert_reply send_convert_reply
3516 * receive_unlock_reply send_unlock_reply
3517 * receive_cancel_reply send_cancel_reply
3518 * receive_lookup_reply send_lookup_reply
3521 static int _create_message(struct dlm_ls *ls, int mb_len,
3522 int to_nodeid, int mstype,
3523 struct dlm_message **ms_ret,
3524 struct dlm_mhandle **mh_ret)
3526 struct dlm_message *ms;
3527 struct dlm_mhandle *mh;
3530 /* get_buffer gives us a message handle (mh) that we need to
3531 pass into midcomms_commit and a message buffer (mb) that we
3532 write our data into */
3534 mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, GFP_NOFS, &mb);
3538 ms = (struct dlm_message *) mb;
3540 ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3541 ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3542 ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3543 ms->m_header.h_length = cpu_to_le16(mb_len);
3544 ms->m_header.h_cmd = DLM_MSG;
3546 ms->m_type = cpu_to_le32(mstype);
3553 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3554 int to_nodeid, int mstype,
3555 struct dlm_message **ms_ret,
3556 struct dlm_mhandle **mh_ret)
3558 int mb_len = sizeof(struct dlm_message);
3561 case DLM_MSG_REQUEST:
3562 case DLM_MSG_LOOKUP:
3563 case DLM_MSG_REMOVE:
3564 mb_len += r->res_length;
3566 case DLM_MSG_CONVERT:
3567 case DLM_MSG_UNLOCK:
3568 case DLM_MSG_REQUEST_REPLY:
3569 case DLM_MSG_CONVERT_REPLY:
3571 if (lkb && lkb->lkb_lvbptr)
3572 mb_len += r->res_ls->ls_lvblen;
3576 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3580 /* further lowcomms enhancements or alternate implementations may make
3581 the return value from this function useful at some point */
3583 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
3585 dlm_midcomms_commit_mhandle(mh);
3589 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3590 struct dlm_message *ms)
3592 ms->m_nodeid = cpu_to_le32(lkb->lkb_nodeid);
3593 ms->m_pid = cpu_to_le32(lkb->lkb_ownpid);
3594 ms->m_lkid = cpu_to_le32(lkb->lkb_id);
3595 ms->m_remid = cpu_to_le32(lkb->lkb_remid);
3596 ms->m_exflags = cpu_to_le32(lkb->lkb_exflags);
3597 ms->m_sbflags = cpu_to_le32(lkb->lkb_sbflags);
3598 ms->m_flags = cpu_to_le32(lkb->lkb_flags);
3599 ms->m_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
3600 ms->m_status = cpu_to_le32(lkb->lkb_status);
3601 ms->m_grmode = cpu_to_le32(lkb->lkb_grmode);
3602 ms->m_rqmode = cpu_to_le32(lkb->lkb_rqmode);
3603 ms->m_hash = cpu_to_le32(r->res_hash);
3605 /* m_result and m_bastmode are set from function args,
3606 not from lkb fields */
3608 if (lkb->lkb_bastfn)
3609 ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3611 ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3613 /* compare with switch in create_message; send_remove() doesn't
3616 switch (ms->m_type) {
3617 case cpu_to_le32(DLM_MSG_REQUEST):
3618 case cpu_to_le32(DLM_MSG_LOOKUP):
3619 memcpy(ms->m_extra, r->res_name, r->res_length);
3621 case cpu_to_le32(DLM_MSG_CONVERT):
3622 case cpu_to_le32(DLM_MSG_UNLOCK):
3623 case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3624 case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3625 case cpu_to_le32(DLM_MSG_GRANT):
3626 if (!lkb->lkb_lvbptr)
3628 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3633 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3635 struct dlm_message *ms;
3636 struct dlm_mhandle *mh;
3637 int to_nodeid, error;
3639 to_nodeid = r->res_nodeid;
3641 error = add_to_waiters(lkb, mstype, to_nodeid);
3645 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3649 send_args(r, lkb, ms);
3651 error = send_message(mh, ms);
3657 remove_from_waiters(lkb, msg_reply_type(mstype));
3661 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3663 return send_common(r, lkb, DLM_MSG_REQUEST);
3666 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3670 error = send_common(r, lkb, DLM_MSG_CONVERT);
3672 /* down conversions go without a reply from the master */
3673 if (!error && down_conversion(lkb)) {
3674 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3675 r->res_ls->ls_stub_ms.m_flags = cpu_to_le32(DLM_IFL_STUB_MS);
3676 r->res_ls->ls_stub_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3677 r->res_ls->ls_stub_ms.m_result = 0;
3678 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3684 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3685 MASTER_UNCERTAIN to force the next request on the rsb to confirm
3686 that the master is still correct. */
3688 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3690 return send_common(r, lkb, DLM_MSG_UNLOCK);
3693 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3695 return send_common(r, lkb, DLM_MSG_CANCEL);
3698 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3700 struct dlm_message *ms;
3701 struct dlm_mhandle *mh;
3702 int to_nodeid, error;
3704 to_nodeid = lkb->lkb_nodeid;
3706 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3710 send_args(r, lkb, ms);
3714 error = send_message(mh, ms);
3719 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3721 struct dlm_message *ms;
3722 struct dlm_mhandle *mh;
3723 int to_nodeid, error;
3725 to_nodeid = lkb->lkb_nodeid;
3727 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3731 send_args(r, lkb, ms);
3733 ms->m_bastmode = cpu_to_le32(mode);
3735 error = send_message(mh, ms);
3740 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3742 struct dlm_message *ms;
3743 struct dlm_mhandle *mh;
3744 int to_nodeid, error;
3746 to_nodeid = dlm_dir_nodeid(r);
3748 error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3752 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3756 send_args(r, lkb, ms);
3758 error = send_message(mh, ms);
3764 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3768 static int send_remove(struct dlm_rsb *r)
3770 struct dlm_message *ms;
3771 struct dlm_mhandle *mh;
3772 int to_nodeid, error;
3774 to_nodeid = dlm_dir_nodeid(r);
3776 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3780 memcpy(ms->m_extra, r->res_name, r->res_length);
3781 ms->m_hash = cpu_to_le32(r->res_hash);
3783 error = send_message(mh, ms);
3788 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3791 struct dlm_message *ms;
3792 struct dlm_mhandle *mh;
3793 int to_nodeid, error;
3795 to_nodeid = lkb->lkb_nodeid;
3797 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3801 send_args(r, lkb, ms);
3803 ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3805 error = send_message(mh, ms);
3810 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3812 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3815 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3817 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3820 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3822 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3825 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3827 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3830 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3831 int ret_nodeid, int rv)
3833 struct dlm_rsb *r = &ls->ls_stub_rsb;
3834 struct dlm_message *ms;
3835 struct dlm_mhandle *mh;
3836 int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3838 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3842 ms->m_lkid = ms_in->m_lkid;
3843 ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3844 ms->m_nodeid = cpu_to_le32(ret_nodeid);
3846 error = send_message(mh, ms);
3851 /* which args we save from a received message depends heavily on the type
3852 of message, unlike the send side where we can safely send everything about
3853 the lkb for any type of message */
3855 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3857 lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3858 lkb->lkb_sbflags = le32_to_cpu(ms->m_sbflags);
3859 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3860 (le32_to_cpu(ms->m_flags) & 0x0000FFFF);
3863 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3865 if (ms->m_flags == cpu_to_le32(DLM_IFL_STUB_MS))
3868 lkb->lkb_sbflags = le32_to_cpu(ms->m_sbflags);
3869 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3870 (le32_to_cpu(ms->m_flags) & 0x0000FFFF);
3873 static int receive_extralen(struct dlm_message *ms)
3875 return (le16_to_cpu(ms->m_header.h_length) -
3876 sizeof(struct dlm_message));
3879 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3880 struct dlm_message *ms)
3884 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3885 if (!lkb->lkb_lvbptr)
3886 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3887 if (!lkb->lkb_lvbptr)
3889 len = receive_extralen(ms);
3890 if (len > ls->ls_lvblen)
3891 len = ls->ls_lvblen;
3892 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3897 static void fake_bastfn(void *astparam, int mode)
3899 log_print("fake_bastfn should not be called");
3902 static void fake_astfn(void *astparam)
3904 log_print("fake_astfn should not be called");
3907 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3908 struct dlm_message *ms)
3910 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3911 lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3912 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3913 lkb->lkb_grmode = DLM_LOCK_IV;
3914 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3916 lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3917 lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3919 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3920 /* lkb was just created so there won't be an lvb yet */
3921 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3922 if (!lkb->lkb_lvbptr)
3929 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3930 struct dlm_message *ms)
3932 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3935 if (receive_lvb(ls, lkb, ms))
3938 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3939 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3944 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3945 struct dlm_message *ms)
3947 if (receive_lvb(ls, lkb, ms))
3952 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3953 uses to send a reply and that the remote end uses to process the reply. */
3955 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3957 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3958 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3959 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3962 /* This is called after the rsb is locked so that we can safely inspect
3963 fields in the lkb. */
3965 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3967 int from = le32_to_cpu(ms->m_header.h_nodeid);
3970 /* currently mixing of user/kernel locks are not supported */
3971 if (ms->m_flags & cpu_to_le32(DLM_IFL_USER) &&
3972 ~lkb->lkb_flags & DLM_IFL_USER) {
3973 log_error(lkb->lkb_resource->res_ls,
3974 "got user dlm message for a kernel lock");
3979 switch (ms->m_type) {
3980 case cpu_to_le32(DLM_MSG_CONVERT):
3981 case cpu_to_le32(DLM_MSG_UNLOCK):
3982 case cpu_to_le32(DLM_MSG_CANCEL):
3983 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3987 case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3988 case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
3989 case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
3990 case cpu_to_le32(DLM_MSG_GRANT):
3991 case cpu_to_le32(DLM_MSG_BAST):
3992 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3996 case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3997 if (!is_process_copy(lkb))
3999 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
4009 log_error(lkb->lkb_resource->res_ls,
4010 "ignore invalid message %d from %d %x %x %x %d",
4011 le32_to_cpu(ms->m_type), from, lkb->lkb_id,
4012 lkb->lkb_remid, lkb->lkb_flags, lkb->lkb_nodeid);
4016 static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
4018 char name[DLM_RESNAME_MAXLEN + 1];
4019 struct dlm_message *ms;
4020 struct dlm_mhandle *mh;
4025 memset(name, 0, sizeof(name));
4026 memcpy(name, ms_name, len);
4028 hash = jhash(name, len, 0);
4029 b = hash & (ls->ls_rsbtbl_size - 1);
4031 dir_nodeid = dlm_hash2nodeid(ls, hash);
4033 log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name);
4035 spin_lock(&ls->ls_rsbtbl[b].lock);
4036 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4038 spin_unlock(&ls->ls_rsbtbl[b].lock);
4039 log_error(ls, "repeat_remove on keep %s", name);
4043 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4045 spin_unlock(&ls->ls_rsbtbl[b].lock);
4046 log_error(ls, "repeat_remove on toss %s", name);
4050 /* use ls->remove_name2 to avoid conflict with shrink? */
4052 spin_lock(&ls->ls_remove_spin);
4053 ls->ls_remove_len = len;
4054 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
4055 spin_unlock(&ls->ls_remove_spin);
4056 spin_unlock(&ls->ls_rsbtbl[b].lock);
4058 rv = _create_message(ls, sizeof(struct dlm_message) + len,
4059 dir_nodeid, DLM_MSG_REMOVE, &ms, &mh);
4063 memcpy(ms->m_extra, name, len);
4064 ms->m_hash = cpu_to_le32(hash);
4066 send_message(mh, ms);
4069 spin_lock(&ls->ls_remove_spin);
4070 ls->ls_remove_len = 0;
4071 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
4072 spin_unlock(&ls->ls_remove_spin);
4073 wake_up(&ls->ls_remove_wait);
4076 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
4078 struct dlm_lkb *lkb;
4081 int error, namelen = 0;
4083 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4085 error = create_lkb(ls, &lkb);
4089 receive_flags(lkb, ms);
4090 lkb->lkb_flags |= DLM_IFL_MSTCPY;
4091 error = receive_request_args(ls, lkb, ms);
4097 /* The dir node is the authority on whether we are the master
4098 for this rsb or not, so if the master sends us a request, we should
4099 recreate the rsb if we've destroyed it. This race happens when we
4100 send a remove message to the dir node at the same time that the dir
4101 node sends us a request for the rsb. */
4103 namelen = receive_extralen(ms);
4105 error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4106 R_RECEIVE_REQUEST, &r);
4114 if (r->res_master_nodeid != dlm_our_nodeid()) {
4115 error = validate_master_nodeid(ls, r, from_nodeid);
4125 error = do_request(r, lkb);
4126 send_request_reply(r, lkb, error);
4127 do_request_effects(r, lkb, error);
4132 if (error == -EINPROGRESS)
4139 /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4140 and do this receive_request again from process_lookup_list once
4141 we get the lookup reply. This would avoid a many repeated
4142 ENOTBLK request failures when the lookup reply designating us
4143 as master is delayed. */
4145 /* We could repeatedly return -EBADR here if our send_remove() is
4146 delayed in being sent/arriving/being processed on the dir node.
4147 Another node would repeatedly lookup up the master, and the dir
4148 node would continue returning our nodeid until our send_remove
4151 We send another remove message in case our previous send_remove
4152 was lost/ignored/missed somehow. */
4154 if (error != -ENOTBLK) {
4155 log_limit(ls, "receive_request %x from %d %d",
4156 le32_to_cpu(ms->m_lkid), from_nodeid, error);
4159 if (namelen && error == -EBADR) {
4160 send_repeat_remove(ls, ms->m_extra, namelen);
4164 setup_stub_lkb(ls, ms);
4165 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4169 static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
4171 struct dlm_lkb *lkb;
4173 int error, reply = 1;
4175 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4179 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4180 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4181 "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4182 (unsigned long long)lkb->lkb_recover_seq,
4183 le32_to_cpu(ms->m_header.h_nodeid),
4184 le32_to_cpu(ms->m_lkid));
4190 r = lkb->lkb_resource;
4195 error = validate_message(lkb, ms);
4199 receive_flags(lkb, ms);
4201 error = receive_convert_args(ls, lkb, ms);
4203 send_convert_reply(r, lkb, error);
4207 reply = !down_conversion(lkb);
4209 error = do_convert(r, lkb);
4211 send_convert_reply(r, lkb, error);
4212 do_convert_effects(r, lkb, error);
4220 setup_stub_lkb(ls, ms);
4221 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4225 static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
4227 struct dlm_lkb *lkb;
4231 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4235 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4236 log_error(ls, "receive_unlock %x remid %x remote %d %x",
4237 lkb->lkb_id, lkb->lkb_remid,
4238 le32_to_cpu(ms->m_header.h_nodeid),
4239 le32_to_cpu(ms->m_lkid));
4245 r = lkb->lkb_resource;
4250 error = validate_message(lkb, ms);
4254 receive_flags(lkb, ms);
4256 error = receive_unlock_args(ls, lkb, ms);
4258 send_unlock_reply(r, lkb, error);
4262 error = do_unlock(r, lkb);
4263 send_unlock_reply(r, lkb, error);
4264 do_unlock_effects(r, lkb, error);
4272 setup_stub_lkb(ls, ms);
4273 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4277 static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
4279 struct dlm_lkb *lkb;
4283 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4287 receive_flags(lkb, ms);
4289 r = lkb->lkb_resource;
4294 error = validate_message(lkb, ms);
4298 error = do_cancel(r, lkb);
4299 send_cancel_reply(r, lkb, error);
4300 do_cancel_effects(r, lkb, error);
4308 setup_stub_lkb(ls, ms);
4309 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4313 static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
4315 struct dlm_lkb *lkb;
4319 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4323 r = lkb->lkb_resource;
4328 error = validate_message(lkb, ms);
4332 receive_flags_reply(lkb, ms);
4333 if (is_altmode(lkb))
4334 munge_altmode(lkb, ms);
4335 grant_lock_pc(r, lkb, ms);
4336 queue_cast(r, lkb, 0);
4344 static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
4346 struct dlm_lkb *lkb;
4350 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4354 r = lkb->lkb_resource;
4359 error = validate_message(lkb, ms);
4363 queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4364 lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4372 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
4374 int len, error, ret_nodeid, from_nodeid, our_nodeid;
4376 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4377 our_nodeid = dlm_our_nodeid();
4379 len = receive_extralen(ms);
4381 error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4384 /* Optimization: we're master so treat lookup as a request */
4385 if (!error && ret_nodeid == our_nodeid) {
4386 receive_request(ls, ms);
4389 send_lookup_reply(ls, ms, ret_nodeid, error);
4392 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
4394 char name[DLM_RESNAME_MAXLEN+1];
4397 int rv, len, dir_nodeid, from_nodeid;
4399 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4401 len = receive_extralen(ms);
4403 if (len > DLM_RESNAME_MAXLEN) {
4404 log_error(ls, "receive_remove from %d bad len %d",
4409 dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4410 if (dir_nodeid != dlm_our_nodeid()) {
4411 log_error(ls, "receive_remove from %d bad nodeid %d",
4412 from_nodeid, dir_nodeid);
4416 /* Look for name on rsbtbl.toss, if it's there, kill it.
4417 If it's on rsbtbl.keep, it's being used, and we should ignore this
4418 message. This is an expected race between the dir node sending a
4419 request to the master node at the same time as the master node sends
4420 a remove to the dir node. The resolution to that race is for the
4421 dir node to ignore the remove message, and the master node to
4422 recreate the master rsb when it gets a request from the dir node for
4423 an rsb it doesn't have. */
4425 memset(name, 0, sizeof(name));
4426 memcpy(name, ms->m_extra, len);
4428 hash = jhash(name, len, 0);
4429 b = hash & (ls->ls_rsbtbl_size - 1);
4431 spin_lock(&ls->ls_rsbtbl[b].lock);
4433 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4435 /* verify the rsb is on keep list per comment above */
4436 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4438 /* should not happen */
4439 log_error(ls, "receive_remove from %d not found %s",
4441 spin_unlock(&ls->ls_rsbtbl[b].lock);
4444 if (r->res_master_nodeid != from_nodeid) {
4445 /* should not happen */
4446 log_error(ls, "receive_remove keep from %d master %d",
4447 from_nodeid, r->res_master_nodeid);
4449 spin_unlock(&ls->ls_rsbtbl[b].lock);
4453 log_debug(ls, "receive_remove from %d master %d first %x %s",
4454 from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4456 spin_unlock(&ls->ls_rsbtbl[b].lock);
4460 if (r->res_master_nodeid != from_nodeid) {
4461 log_error(ls, "receive_remove toss from %d master %d",
4462 from_nodeid, r->res_master_nodeid);
4464 spin_unlock(&ls->ls_rsbtbl[b].lock);
4468 if (kref_put(&r->res_ref, kill_rsb)) {
4469 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4470 spin_unlock(&ls->ls_rsbtbl[b].lock);
4473 log_error(ls, "receive_remove from %d rsb ref error",
4476 spin_unlock(&ls->ls_rsbtbl[b].lock);
4480 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
4482 do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4485 static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
4487 struct dlm_lkb *lkb;
4489 int error, mstype, result;
4490 int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4492 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4496 r = lkb->lkb_resource;
4500 error = validate_message(lkb, ms);
4504 mstype = lkb->lkb_wait_type;
4505 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4507 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4508 lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4509 from_dlm_errno(le32_to_cpu(ms->m_result)));
4514 /* Optimization: the dir node was also the master, so it took our
4515 lookup as a request and sent request reply instead of lookup reply */
4516 if (mstype == DLM_MSG_LOOKUP) {
4517 r->res_master_nodeid = from_nodeid;
4518 r->res_nodeid = from_nodeid;
4519 lkb->lkb_nodeid = from_nodeid;
4522 /* this is the value returned from do_request() on the master */
4523 result = from_dlm_errno(le32_to_cpu(ms->m_result));
4527 /* request would block (be queued) on remote master */
4528 queue_cast(r, lkb, -EAGAIN);
4529 confirm_master(r, -EAGAIN);
4530 unhold_lkb(lkb); /* undoes create_lkb() */
4535 /* request was queued or granted on remote master */
4536 receive_flags_reply(lkb, ms);
4537 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4538 if (is_altmode(lkb))
4539 munge_altmode(lkb, ms);
4541 add_lkb(r, lkb, DLM_LKSTS_WAITING);
4544 grant_lock_pc(r, lkb, ms);
4545 queue_cast(r, lkb, 0);
4547 confirm_master(r, result);
4552 /* find_rsb failed to find rsb or rsb wasn't master */
4553 log_limit(ls, "receive_request_reply %x from %d %d "
4554 "master %d dir %d first %x %s", lkb->lkb_id,
4555 from_nodeid, result, r->res_master_nodeid,
4556 r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4558 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4559 r->res_master_nodeid != dlm_our_nodeid()) {
4560 /* cause _request_lock->set_master->send_lookup */
4561 r->res_master_nodeid = 0;
4563 lkb->lkb_nodeid = -1;
4566 if (is_overlap(lkb)) {
4567 /* we'll ignore error in cancel/unlock reply */
4568 queue_cast_overlap(r, lkb);
4569 confirm_master(r, result);
4570 unhold_lkb(lkb); /* undoes create_lkb() */
4572 _request_lock(r, lkb);
4574 if (r->res_master_nodeid == dlm_our_nodeid())
4575 confirm_master(r, 0);
4580 log_error(ls, "receive_request_reply %x error %d",
4581 lkb->lkb_id, result);
4584 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
4585 log_debug(ls, "receive_request_reply %x result %d unlock",
4586 lkb->lkb_id, result);
4587 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4588 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4589 send_unlock(r, lkb);
4590 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
4591 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4592 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4593 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4594 send_cancel(r, lkb);
4596 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4597 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4606 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4607 struct dlm_message *ms)
4609 /* this is the value returned from do_convert() on the master */
4610 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4612 /* convert would block (be queued) on remote master */
4613 queue_cast(r, lkb, -EAGAIN);
4617 receive_flags_reply(lkb, ms);
4618 revert_lock_pc(r, lkb);
4619 queue_cast(r, lkb, -EDEADLK);
4623 /* convert was queued on remote master */
4624 receive_flags_reply(lkb, ms);
4625 if (is_demoted(lkb))
4628 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4633 /* convert was granted on remote master */
4634 receive_flags_reply(lkb, ms);
4635 if (is_demoted(lkb))
4637 grant_lock_pc(r, lkb, ms);
4638 queue_cast(r, lkb, 0);
4642 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4643 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4644 le32_to_cpu(ms->m_lkid),
4645 from_dlm_errno(le32_to_cpu(ms->m_result)));
4651 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4653 struct dlm_rsb *r = lkb->lkb_resource;
4659 error = validate_message(lkb, ms);
4663 /* stub reply can happen with waiters_mutex held */
4664 error = remove_from_waiters_ms(lkb, ms);
4668 __receive_convert_reply(r, lkb, ms);
4674 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
4676 struct dlm_lkb *lkb;
4679 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4683 _receive_convert_reply(lkb, ms);
4688 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4690 struct dlm_rsb *r = lkb->lkb_resource;
4696 error = validate_message(lkb, ms);
4700 /* stub reply can happen with waiters_mutex held */
4701 error = remove_from_waiters_ms(lkb, ms);
4705 /* this is the value returned from do_unlock() on the master */
4707 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4709 receive_flags_reply(lkb, ms);
4710 remove_lock_pc(r, lkb);
4711 queue_cast(r, lkb, -DLM_EUNLOCK);
4716 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4717 lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4724 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
4726 struct dlm_lkb *lkb;
4729 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4733 _receive_unlock_reply(lkb, ms);
4738 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4740 struct dlm_rsb *r = lkb->lkb_resource;
4746 error = validate_message(lkb, ms);
4750 /* stub reply can happen with waiters_mutex held */
4751 error = remove_from_waiters_ms(lkb, ms);
4755 /* this is the value returned from do_cancel() on the master */
4757 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4759 receive_flags_reply(lkb, ms);
4760 revert_lock_pc(r, lkb);
4761 queue_cast(r, lkb, -DLM_ECANCEL);
4766 log_error(r->res_ls, "receive_cancel_reply %x error %d",
4768 from_dlm_errno(le32_to_cpu(ms->m_result)));
4775 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
4777 struct dlm_lkb *lkb;
4780 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4784 _receive_cancel_reply(lkb, ms);
4789 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4791 struct dlm_lkb *lkb;
4793 int error, ret_nodeid;
4794 int do_lookup_list = 0;
4796 error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4798 log_error(ls, "%s no lkid %x", __func__,
4799 le32_to_cpu(ms->m_lkid));
4803 /* ms->m_result is the value returned by dlm_master_lookup on dir node
4804 FIXME: will a non-zero error ever be returned? */
4806 r = lkb->lkb_resource;
4810 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4814 ret_nodeid = le32_to_cpu(ms->m_nodeid);
4816 /* We sometimes receive a request from the dir node for this
4817 rsb before we've received the dir node's loookup_reply for it.
4818 The request from the dir node implies we're the master, so we set
4819 ourself as master in receive_request_reply, and verify here that
4820 we are indeed the master. */
4822 if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4823 /* This should never happen */
4824 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4825 "master %d dir %d our %d first %x %s",
4826 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4827 ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4828 dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4831 if (ret_nodeid == dlm_our_nodeid()) {
4832 r->res_master_nodeid = ret_nodeid;
4835 r->res_first_lkid = 0;
4836 } else if (ret_nodeid == -1) {
4837 /* the remote node doesn't believe it's the dir node */
4838 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4839 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4840 r->res_master_nodeid = 0;
4842 lkb->lkb_nodeid = -1;
4844 /* set_master() will set lkb_nodeid from r */
4845 r->res_master_nodeid = ret_nodeid;
4846 r->res_nodeid = ret_nodeid;
4849 if (is_overlap(lkb)) {
4850 log_debug(ls, "receive_lookup_reply %x unlock %x",
4851 lkb->lkb_id, lkb->lkb_flags);
4852 queue_cast_overlap(r, lkb);
4853 unhold_lkb(lkb); /* undoes create_lkb() */
4857 _request_lock(r, lkb);
4861 process_lookup_list(r);
4868 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4871 int error = 0, noent = 0;
4873 if (!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid))) {
4874 log_limit(ls, "receive %d from non-member %d %x %x %d",
4875 le32_to_cpu(ms->m_type),
4876 le32_to_cpu(ms->m_header.h_nodeid),
4877 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4878 from_dlm_errno(le32_to_cpu(ms->m_result)));
4882 switch (ms->m_type) {
4884 /* messages sent to a master node */
4886 case cpu_to_le32(DLM_MSG_REQUEST):
4887 error = receive_request(ls, ms);
4890 case cpu_to_le32(DLM_MSG_CONVERT):
4891 error = receive_convert(ls, ms);
4894 case cpu_to_le32(DLM_MSG_UNLOCK):
4895 error = receive_unlock(ls, ms);
4898 case cpu_to_le32(DLM_MSG_CANCEL):
4900 error = receive_cancel(ls, ms);
4903 /* messages sent from a master node (replies to above) */
4905 case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4906 error = receive_request_reply(ls, ms);
4909 case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4910 error = receive_convert_reply(ls, ms);
4913 case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4914 error = receive_unlock_reply(ls, ms);
4917 case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4918 error = receive_cancel_reply(ls, ms);
4921 /* messages sent from a master node (only two types of async msg) */
4923 case cpu_to_le32(DLM_MSG_GRANT):
4925 error = receive_grant(ls, ms);
4928 case cpu_to_le32(DLM_MSG_BAST):
4930 error = receive_bast(ls, ms);
4933 /* messages sent to a dir node */
4935 case cpu_to_le32(DLM_MSG_LOOKUP):
4936 receive_lookup(ls, ms);
4939 case cpu_to_le32(DLM_MSG_REMOVE):
4940 receive_remove(ls, ms);
4943 /* messages sent from a dir node (remove has no reply) */
4945 case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4946 receive_lookup_reply(ls, ms);
4949 /* other messages */
4951 case cpu_to_le32(DLM_MSG_PURGE):
4952 receive_purge(ls, ms);
4956 log_error(ls, "unknown message type %d",
4957 le32_to_cpu(ms->m_type));
4961 * When checking for ENOENT, we're checking the result of
4962 * find_lkb(m_remid):
4964 * The lock id referenced in the message wasn't found. This may
4965 * happen in normal usage for the async messages and cancel, so
4966 * only use log_debug for them.
4968 * Some errors are expected and normal.
4971 if (error == -ENOENT && noent) {
4972 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4973 le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4974 le32_to_cpu(ms->m_header.h_nodeid),
4975 le32_to_cpu(ms->m_lkid), saved_seq);
4976 } else if (error == -ENOENT) {
4977 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4978 le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4979 le32_to_cpu(ms->m_header.h_nodeid),
4980 le32_to_cpu(ms->m_lkid), saved_seq);
4982 if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
4983 dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4986 if (error == -EINVAL) {
4987 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4989 le32_to_cpu(ms->m_type),
4990 le32_to_cpu(ms->m_header.h_nodeid),
4991 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4996 /* If the lockspace is in recovery mode (locking stopped), then normal
4997 messages are saved on the requestqueue for processing after recovery is
4998 done. When not in recovery mode, we wait for dlm_recoverd to drain saved
4999 messages off the requestqueue before we process new ones. This occurs right
5000 after recovery completes when we transition from saving all messages on
5001 requestqueue, to processing all the saved messages, to processing new
5002 messages as they arrive. */
5004 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
5007 if (dlm_locking_stopped(ls)) {
5008 /* If we were a member of this lockspace, left, and rejoined,
5009 other nodes may still be sending us messages from the
5010 lockspace generation before we left. */
5011 if (!ls->ls_generation) {
5012 log_limit(ls, "receive %d from %d ignore old gen",
5013 le32_to_cpu(ms->m_type), nodeid);
5017 dlm_add_requestqueue(ls, nodeid, ms);
5019 dlm_wait_requestqueue(ls);
5020 _receive_message(ls, ms, 0);
5024 /* This is called by dlm_recoverd to process messages that were saved on
5025 the requestqueue. */
5027 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
5030 _receive_message(ls, ms, saved_seq);
5033 /* This is called by the midcomms layer when something is received for
5034 the lockspace. It could be either a MSG (normal message sent as part of
5035 standard locking activity) or an RCOM (recovery message sent as part of
5036 lockspace recovery). */
5038 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
5040 struct dlm_header *hd = &p->header;
5044 switch (hd->h_cmd) {
5046 type = le32_to_cpu(p->message.m_type);
5049 type = le32_to_cpu(p->rcom.rc_type);
5052 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
5056 if (le32_to_cpu(hd->h_nodeid) != nodeid) {
5057 log_print("invalid h_nodeid %d from %d lockspace %x",
5058 le32_to_cpu(hd->h_nodeid), nodeid,
5059 le32_to_cpu(hd->u.h_lockspace));
5063 ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
5065 if (dlm_config.ci_log_debug) {
5066 printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
5067 "%u from %d cmd %d type %d\n",
5068 le32_to_cpu(hd->u.h_lockspace), nodeid,
5072 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
5073 dlm_send_ls_not_ready(nodeid, &p->rcom);
5077 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
5078 be inactive (in this ls) before transitioning to recovery mode */
5080 down_read(&ls->ls_recv_active);
5081 if (hd->h_cmd == DLM_MSG)
5082 dlm_receive_message(ls, &p->message, nodeid);
5084 dlm_receive_rcom(ls, &p->rcom, nodeid);
5085 up_read(&ls->ls_recv_active);
5087 dlm_put_lockspace(ls);
5090 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5091 struct dlm_message *ms_stub)
5093 if (middle_conversion(lkb)) {
5095 memset(ms_stub, 0, sizeof(struct dlm_message));
5096 ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS);
5097 ms_stub->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
5098 ms_stub->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
5099 ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5100 _receive_convert_reply(lkb, ms_stub);
5102 /* Same special case as in receive_rcom_lock_args() */
5103 lkb->lkb_grmode = DLM_LOCK_IV;
5104 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
5107 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5108 lkb->lkb_flags |= DLM_IFL_RESEND;
5111 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5112 conversions are async; there's no reply from the remote master */
5115 /* A waiting lkb needs recovery if the master node has failed, or
5116 the master node is changing (only when no directory is used) */
5118 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5121 if (dlm_no_directory(ls))
5124 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5130 /* Recovery for locks that are waiting for replies from nodes that are now
5131 gone. We can just complete unlocks and cancels by faking a reply from the
5132 dead node. Requests and up-conversions we flag to be resent after
5133 recovery. Down-conversions can just be completed with a fake reply like
5134 unlocks. Conversions between PR and CW need special attention. */
5136 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5138 struct dlm_lkb *lkb, *safe;
5139 struct dlm_message *ms_stub;
5140 int wait_type, stub_unlock_result, stub_cancel_result;
5143 ms_stub = kmalloc(sizeof(*ms_stub), GFP_KERNEL);
5147 mutex_lock(&ls->ls_waiters_mutex);
5149 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5151 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5153 /* exclude debug messages about unlocks because there can be so
5154 many and they aren't very interesting */
5156 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5157 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5158 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5162 lkb->lkb_resource->res_nodeid,
5164 lkb->lkb_wait_nodeid,
5168 /* all outstanding lookups, regardless of destination will be
5169 resent after recovery is done */
5171 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5172 lkb->lkb_flags |= DLM_IFL_RESEND;
5176 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5179 wait_type = lkb->lkb_wait_type;
5180 stub_unlock_result = -DLM_EUNLOCK;
5181 stub_cancel_result = -DLM_ECANCEL;
5183 /* Main reply may have been received leaving a zero wait_type,
5184 but a reply for the overlapping op may not have been
5185 received. In that case we need to fake the appropriate
5186 reply for the overlap op. */
5189 if (is_overlap_cancel(lkb)) {
5190 wait_type = DLM_MSG_CANCEL;
5191 if (lkb->lkb_grmode == DLM_LOCK_IV)
5192 stub_cancel_result = 0;
5194 if (is_overlap_unlock(lkb)) {
5195 wait_type = DLM_MSG_UNLOCK;
5196 if (lkb->lkb_grmode == DLM_LOCK_IV)
5197 stub_unlock_result = -ENOENT;
5200 log_debug(ls, "rwpre overlap %x %x %d %d %d",
5201 lkb->lkb_id, lkb->lkb_flags, wait_type,
5202 stub_cancel_result, stub_unlock_result);
5205 switch (wait_type) {
5207 case DLM_MSG_REQUEST:
5208 lkb->lkb_flags |= DLM_IFL_RESEND;
5211 case DLM_MSG_CONVERT:
5212 recover_convert_waiter(ls, lkb, ms_stub);
5215 case DLM_MSG_UNLOCK:
5217 memset(ms_stub, 0, sizeof(struct dlm_message));
5218 ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS);
5219 ms_stub->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
5220 ms_stub->m_result = cpu_to_le32(to_dlm_errno(stub_unlock_result));
5221 ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5222 _receive_unlock_reply(lkb, ms_stub);
5226 case DLM_MSG_CANCEL:
5228 memset(ms_stub, 0, sizeof(struct dlm_message));
5229 ms_stub->m_flags = cpu_to_le32(DLM_IFL_STUB_MS);
5230 ms_stub->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
5231 ms_stub->m_result = cpu_to_le32(to_dlm_errno(stub_cancel_result));
5232 ms_stub->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5233 _receive_cancel_reply(lkb, ms_stub);
5238 log_error(ls, "invalid lkb wait_type %d %d",
5239 lkb->lkb_wait_type, wait_type);
5243 mutex_unlock(&ls->ls_waiters_mutex);
5247 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5249 struct dlm_lkb *lkb = NULL, *iter;
5251 mutex_lock(&ls->ls_waiters_mutex);
5252 list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5253 if (iter->lkb_flags & DLM_IFL_RESEND) {
5259 mutex_unlock(&ls->ls_waiters_mutex);
5264 /* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
5265 master or dir-node for r. Processing the lkb may result in it being placed
5268 /* We do this after normal locking has been enabled and any saved messages
5269 (in requestqueue) have been processed. We should be confident that at
5270 this point we won't get or process a reply to any of these waiting
5271 operations. But, new ops may be coming in on the rsbs/locks here from
5272 userspace or remotely. */
5274 /* there may have been an overlap unlock/cancel prior to recovery or after
5275 recovery. if before, the lkb may still have a pos wait_count; if after, the
5276 overlap flag would just have been set and nothing new sent. we can be
5277 confident here than any replies to either the initial op or overlap ops
5278 prior to recovery have been received. */
5280 int dlm_recover_waiters_post(struct dlm_ls *ls)
5282 struct dlm_lkb *lkb;
5284 int error = 0, mstype, err, oc, ou;
5287 if (dlm_locking_stopped(ls)) {
5288 log_debug(ls, "recover_waiters_post aborted");
5293 lkb = find_resend_waiter(ls);
5297 r = lkb->lkb_resource;
5301 mstype = lkb->lkb_wait_type;
5302 oc = is_overlap_cancel(lkb);
5303 ou = is_overlap_unlock(lkb);
5306 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5307 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5308 "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5309 r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5310 dlm_dir_nodeid(r), oc, ou);
5312 /* At this point we assume that we won't get a reply to any
5313 previous op or overlap op on this lock. First, do a big
5314 remove_from_waiters() for all previous ops. */
5316 lkb->lkb_flags &= ~DLM_IFL_RESEND;
5317 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
5318 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
5319 lkb->lkb_wait_type = 0;
5320 /* drop all wait_count references we still
5321 * hold a reference for this iteration.
5323 while (lkb->lkb_wait_count) {
5324 lkb->lkb_wait_count--;
5327 mutex_lock(&ls->ls_waiters_mutex);
5328 list_del_init(&lkb->lkb_wait_reply);
5329 mutex_unlock(&ls->ls_waiters_mutex);
5332 /* do an unlock or cancel instead of resending */
5334 case DLM_MSG_LOOKUP:
5335 case DLM_MSG_REQUEST:
5336 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5338 unhold_lkb(lkb); /* undoes create_lkb() */
5340 case DLM_MSG_CONVERT:
5342 queue_cast(r, lkb, -DLM_ECANCEL);
5344 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5345 _unlock_lock(r, lkb);
5353 case DLM_MSG_LOOKUP:
5354 case DLM_MSG_REQUEST:
5355 _request_lock(r, lkb);
5357 confirm_master(r, 0);
5359 case DLM_MSG_CONVERT:
5360 _convert_lock(r, lkb);
5368 log_error(ls, "waiter %x msg %d r_nodeid %d "
5369 "dir_nodeid %d overlap %d %d",
5370 lkb->lkb_id, mstype, r->res_nodeid,
5371 dlm_dir_nodeid(r), oc, ou);
5381 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5382 struct list_head *list)
5384 struct dlm_lkb *lkb, *safe;
5386 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5387 if (!is_master_copy(lkb))
5390 /* don't purge lkbs we've added in recover_master_copy for
5391 the current recovery seq */
5393 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5398 /* this put should free the lkb */
5399 if (!dlm_put_lkb(lkb))
5400 log_error(ls, "purged mstcpy lkb not released");
5404 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5406 struct dlm_ls *ls = r->res_ls;
5408 purge_mstcpy_list(ls, r, &r->res_grantqueue);
5409 purge_mstcpy_list(ls, r, &r->res_convertqueue);
5410 purge_mstcpy_list(ls, r, &r->res_waitqueue);
5413 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5414 struct list_head *list,
5415 int nodeid_gone, unsigned int *count)
5417 struct dlm_lkb *lkb, *safe;
5419 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5420 if (!is_master_copy(lkb))
5423 if ((lkb->lkb_nodeid == nodeid_gone) ||
5424 dlm_is_removed(ls, lkb->lkb_nodeid)) {
5426 /* tell recover_lvb to invalidate the lvb
5427 because a node holding EX/PW failed */
5428 if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5429 (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5430 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5435 /* this put should free the lkb */
5436 if (!dlm_put_lkb(lkb))
5437 log_error(ls, "purged dead lkb not released");
5439 rsb_set_flag(r, RSB_RECOVER_GRANT);
5446 /* Get rid of locks held by nodes that are gone. */
5448 void dlm_recover_purge(struct dlm_ls *ls)
5451 struct dlm_member *memb;
5452 int nodes_count = 0;
5453 int nodeid_gone = 0;
5454 unsigned int lkb_count = 0;
5456 /* cache one removed nodeid to optimize the common
5457 case of a single node removed */
5459 list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5461 nodeid_gone = memb->nodeid;
5467 down_write(&ls->ls_root_sem);
5468 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5472 purge_dead_list(ls, r, &r->res_grantqueue,
5473 nodeid_gone, &lkb_count);
5474 purge_dead_list(ls, r, &r->res_convertqueue,
5475 nodeid_gone, &lkb_count);
5476 purge_dead_list(ls, r, &r->res_waitqueue,
5477 nodeid_gone, &lkb_count);
5483 up_write(&ls->ls_root_sem);
5486 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5487 lkb_count, nodes_count);
5490 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
5495 spin_lock(&ls->ls_rsbtbl[bucket].lock);
5496 for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5497 r = rb_entry(n, struct dlm_rsb, res_hashnode);
5499 if (!rsb_flag(r, RSB_RECOVER_GRANT))
5501 if (!is_master(r)) {
5502 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5506 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5509 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5514 * Attempt to grant locks on resources that we are the master of.
5515 * Locks may have become grantable during recovery because locks
5516 * from departed nodes have been purged (or not rebuilt), allowing
5517 * previously blocked locks to now be granted. The subset of rsb's
5518 * we are interested in are those with lkb's on either the convert or
5521 * Simplest would be to go through each master rsb and check for non-empty
5522 * convert or waiting queues, and attempt to grant on those rsbs.
5523 * Checking the queues requires lock_rsb, though, for which we'd need
5524 * to release the rsbtbl lock. This would make iterating through all
5525 * rsb's very inefficient. So, we rely on earlier recovery routines
5526 * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5530 void dlm_recover_grant(struct dlm_ls *ls)
5534 unsigned int count = 0;
5535 unsigned int rsb_count = 0;
5536 unsigned int lkb_count = 0;
5539 r = find_grant_rsb(ls, bucket);
5541 if (bucket == ls->ls_rsbtbl_size - 1)
5549 /* the RECOVER_GRANT flag is checked in the grant path */
5550 grant_pending_locks(r, &count);
5551 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5553 confirm_master(r, 0);
5560 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5561 lkb_count, rsb_count);
5564 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5567 struct dlm_lkb *lkb;
5569 list_for_each_entry(lkb, head, lkb_statequeue) {
5570 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5576 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5579 struct dlm_lkb *lkb;
5581 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5584 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5587 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5593 /* needs at least dlm_rcom + rcom_lock */
5594 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5595 struct dlm_rsb *r, struct dlm_rcom *rc)
5597 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5599 lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5600 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5601 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5602 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5603 lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
5604 lkb->lkb_flags |= DLM_IFL_MSTCPY;
5605 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5606 lkb->lkb_rqmode = rl->rl_rqmode;
5607 lkb->lkb_grmode = rl->rl_grmode;
5608 /* don't set lkb_status because add_lkb wants to itself */
5610 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5611 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5613 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5614 int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5615 sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5616 if (lvblen > ls->ls_lvblen)
5618 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5619 if (!lkb->lkb_lvbptr)
5621 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5624 /* Conversions between PR and CW (middle modes) need special handling.
5625 The real granted mode of these converting locks cannot be determined
5626 until all locks have been rebuilt on the rsb (recover_conversion) */
5628 if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5629 middle_conversion(lkb)) {
5630 rl->rl_status = DLM_LKSTS_CONVERT;
5631 lkb->lkb_grmode = DLM_LOCK_IV;
5632 rsb_set_flag(r, RSB_RECOVER_CONVERT);
5638 /* This lkb may have been recovered in a previous aborted recovery so we need
5639 to check if the rsb already has an lkb with the given remote nodeid/lkid.
5640 If so we just send back a standard reply. If not, we create a new lkb with
5641 the given values and send back our lkid. We send back our lkid by sending
5642 back the rcom_lock struct we got but with the remid field filled in. */
5644 /* needs at least dlm_rcom + rcom_lock */
5645 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5647 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5649 struct dlm_lkb *lkb;
5651 int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5654 if (rl->rl_parent_lkid) {
5655 error = -EOPNOTSUPP;
5659 remid = le32_to_cpu(rl->rl_lkid);
5661 /* In general we expect the rsb returned to be R_MASTER, but we don't
5662 have to require it. Recovery of masters on one node can overlap
5663 recovery of locks on another node, so one node can send us MSTCPY
5664 locks before we've made ourselves master of this rsb. We can still
5665 add new MSTCPY locks that we receive here without any harm; when
5666 we make ourselves master, dlm_recover_masters() won't touch the
5667 MSTCPY locks we've received early. */
5669 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5670 from_nodeid, R_RECEIVE_RECOVER, &r);
5676 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5677 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5678 from_nodeid, remid);
5683 lkb = search_remid(r, from_nodeid, remid);
5689 error = create_lkb(ls, &lkb);
5693 error = receive_rcom_lock_args(ls, lkb, r, rc);
5700 add_lkb(r, lkb, rl->rl_status);
5701 ls->ls_recover_locks_in++;
5703 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5704 rsb_set_flag(r, RSB_RECOVER_GRANT);
5707 /* this is the new value returned to the lock holder for
5708 saving in its process-copy lkb */
5709 rl->rl_remid = cpu_to_le32(lkb->lkb_id);
5711 lkb->lkb_recover_seq = ls->ls_recover_seq;
5717 if (error && error != -EEXIST)
5718 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5719 from_nodeid, remid, error);
5720 rl->rl_result = cpu_to_le32(error);
5724 /* needs at least dlm_rcom + rcom_lock */
5725 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5727 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5729 struct dlm_lkb *lkb;
5730 uint32_t lkid, remid;
5733 lkid = le32_to_cpu(rl->rl_lkid);
5734 remid = le32_to_cpu(rl->rl_remid);
5735 result = le32_to_cpu(rl->rl_result);
5737 error = find_lkb(ls, lkid, &lkb);
5739 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5740 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5745 r = lkb->lkb_resource;
5749 if (!is_process_copy(lkb)) {
5750 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5751 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5762 /* There's a chance the new master received our lock before
5763 dlm_recover_master_reply(), this wouldn't happen if we did
5764 a barrier between recover_masters and recover_locks. */
5766 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5767 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5770 dlm_send_rcom_lock(r, lkb);
5774 lkb->lkb_remid = remid;
5777 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5778 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5782 /* an ack for dlm_recover_locks() which waits for replies from
5783 all the locks it sends to new masters */
5784 dlm_recovered_lock(r);
5793 #ifdef CONFIG_DLM_DEPRECATED_API
5794 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5795 int mode, uint32_t flags, void *name, unsigned int namelen,
5796 unsigned long timeout_cs)
5798 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5799 int mode, uint32_t flags, void *name, unsigned int namelen)
5802 struct dlm_lkb *lkb;
5803 struct dlm_args args;
5806 dlm_lock_recovery(ls);
5808 error = create_lkb(ls, &lkb);
5814 if (flags & DLM_LKF_VALBLK) {
5815 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5816 if (!ua->lksb.sb_lvbptr) {
5823 #ifdef CONFIG_DLM_DEPRECATED_API
5824 error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
5825 fake_astfn, ua, fake_bastfn, &args);
5827 error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5828 fake_bastfn, &args);
5831 kfree(ua->lksb.sb_lvbptr);
5832 ua->lksb.sb_lvbptr = NULL;
5838 /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5839 When DLM_IFL_USER is set, the dlm knows that this is a userspace
5840 lock and that lkb_astparam is the dlm_user_args structure. */
5841 lkb->lkb_flags |= DLM_IFL_USER;
5842 error = request_lock(ls, lkb, name, namelen, &args);
5858 /* add this new lkb to the per-process list of locks */
5859 spin_lock(&ua->proc->locks_spin);
5861 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5862 spin_unlock(&ua->proc->locks_spin);
5864 dlm_unlock_recovery(ls);
5868 #ifdef CONFIG_DLM_DEPRECATED_API
5869 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5870 int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
5871 unsigned long timeout_cs)
5873 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5874 int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5877 struct dlm_lkb *lkb;
5878 struct dlm_args args;
5879 struct dlm_user_args *ua;
5882 dlm_lock_recovery(ls);
5884 error = find_lkb(ls, lkid, &lkb);
5888 /* user can change the params on its lock when it converts it, or
5889 add an lvb that didn't exist before */
5893 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5894 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5895 if (!ua->lksb.sb_lvbptr) {
5900 if (lvb_in && ua->lksb.sb_lvbptr)
5901 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5903 ua->xid = ua_tmp->xid;
5904 ua->castparam = ua_tmp->castparam;
5905 ua->castaddr = ua_tmp->castaddr;
5906 ua->bastparam = ua_tmp->bastparam;
5907 ua->bastaddr = ua_tmp->bastaddr;
5908 ua->user_lksb = ua_tmp->user_lksb;
5910 #ifdef CONFIG_DLM_DEPRECATED_API
5911 error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
5912 fake_astfn, ua, fake_bastfn, &args);
5914 error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5915 fake_bastfn, &args);
5920 error = convert_lock(ls, lkb, &args);
5922 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5927 dlm_unlock_recovery(ls);
5933 * The caller asks for an orphan lock on a given resource with a given mode.
5934 * If a matching lock exists, it's moved to the owner's list of locks and
5935 * the lkid is returned.
5938 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5939 int mode, uint32_t flags, void *name, unsigned int namelen,
5942 struct dlm_lkb *lkb = NULL, *iter;
5943 struct dlm_user_args *ua;
5944 int found_other_mode = 0;
5947 mutex_lock(&ls->ls_orphans_mutex);
5948 list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5949 if (iter->lkb_resource->res_length != namelen)
5951 if (memcmp(iter->lkb_resource->res_name, name, namelen))
5953 if (iter->lkb_grmode != mode) {
5954 found_other_mode = 1;
5959 list_del_init(&iter->lkb_ownqueue);
5960 iter->lkb_flags &= ~DLM_IFL_ORPHAN;
5961 *lkid = iter->lkb_id;
5964 mutex_unlock(&ls->ls_orphans_mutex);
5966 if (!lkb && found_other_mode) {
5976 lkb->lkb_exflags = flags;
5977 lkb->lkb_ownpid = (int) current->pid;
5981 ua->proc = ua_tmp->proc;
5982 ua->xid = ua_tmp->xid;
5983 ua->castparam = ua_tmp->castparam;
5984 ua->castaddr = ua_tmp->castaddr;
5985 ua->bastparam = ua_tmp->bastparam;
5986 ua->bastaddr = ua_tmp->bastaddr;
5987 ua->user_lksb = ua_tmp->user_lksb;
5990 * The lkb reference from the ls_orphans list was not
5991 * removed above, and is now considered the reference
5992 * for the proc locks list.
5995 spin_lock(&ua->proc->locks_spin);
5996 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5997 spin_unlock(&ua->proc->locks_spin);
6003 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
6004 uint32_t flags, uint32_t lkid, char *lvb_in)
6006 struct dlm_lkb *lkb;
6007 struct dlm_args args;
6008 struct dlm_user_args *ua;
6011 dlm_lock_recovery(ls);
6013 error = find_lkb(ls, lkid, &lkb);
6019 if (lvb_in && ua->lksb.sb_lvbptr)
6020 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
6021 if (ua_tmp->castparam)
6022 ua->castparam = ua_tmp->castparam;
6023 ua->user_lksb = ua_tmp->user_lksb;
6025 error = set_unlock_args(flags, ua, &args);
6029 error = unlock_lock(ls, lkb, &args);
6031 if (error == -DLM_EUNLOCK)
6033 /* from validate_unlock_args() */
6034 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
6039 spin_lock(&ua->proc->locks_spin);
6040 /* dlm_user_add_cb() may have already taken lkb off the proc list */
6041 if (!list_empty(&lkb->lkb_ownqueue))
6042 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
6043 spin_unlock(&ua->proc->locks_spin);
6047 dlm_unlock_recovery(ls);
6052 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
6053 uint32_t flags, uint32_t lkid)
6055 struct dlm_lkb *lkb;
6056 struct dlm_args args;
6057 struct dlm_user_args *ua;
6060 dlm_lock_recovery(ls);
6062 error = find_lkb(ls, lkid, &lkb);
6067 if (ua_tmp->castparam)
6068 ua->castparam = ua_tmp->castparam;
6069 ua->user_lksb = ua_tmp->user_lksb;
6071 error = set_unlock_args(flags, ua, &args);
6075 error = cancel_lock(ls, lkb, &args);
6077 if (error == -DLM_ECANCEL)
6079 /* from validate_unlock_args() */
6080 if (error == -EBUSY)
6085 dlm_unlock_recovery(ls);
6090 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6092 struct dlm_lkb *lkb;
6093 struct dlm_args args;
6094 struct dlm_user_args *ua;
6098 dlm_lock_recovery(ls);
6100 error = find_lkb(ls, lkid, &lkb);
6106 error = set_unlock_args(flags, ua, &args);
6110 /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6112 r = lkb->lkb_resource;
6116 error = validate_unlock_args(lkb, &args);
6119 lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
6121 error = _cancel_lock(r, lkb);
6126 if (error == -DLM_ECANCEL)
6128 /* from validate_unlock_args() */
6129 if (error == -EBUSY)
6134 dlm_unlock_recovery(ls);
6138 /* lkb's that are removed from the waiters list by revert are just left on the
6139 orphans list with the granted orphan locks, to be freed by purge */
6141 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6143 struct dlm_args args;
6146 hold_lkb(lkb); /* reference for the ls_orphans list */
6147 mutex_lock(&ls->ls_orphans_mutex);
6148 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6149 mutex_unlock(&ls->ls_orphans_mutex);
6151 set_unlock_args(0, lkb->lkb_ua, &args);
6153 error = cancel_lock(ls, lkb, &args);
6154 if (error == -DLM_ECANCEL)
6159 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6160 granted. Regardless of what rsb queue the lock is on, it's removed and
6161 freed. The IVVALBLK flag causes the lvb on the resource to be invalidated
6162 if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6164 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6166 struct dlm_args args;
6169 set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6170 lkb->lkb_ua, &args);
6172 error = unlock_lock(ls, lkb, &args);
6173 if (error == -DLM_EUNLOCK)
6178 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6179 (which does lock_rsb) due to deadlock with receiving a message that does
6180 lock_rsb followed by dlm_user_add_cb() */
6182 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6183 struct dlm_user_proc *proc)
6185 struct dlm_lkb *lkb = NULL;
6187 mutex_lock(&ls->ls_clear_proc_locks);
6188 if (list_empty(&proc->locks))
6191 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6192 list_del_init(&lkb->lkb_ownqueue);
6194 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6195 lkb->lkb_flags |= DLM_IFL_ORPHAN;
6197 lkb->lkb_flags |= DLM_IFL_DEAD;
6199 mutex_unlock(&ls->ls_clear_proc_locks);
6203 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6204 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6205 which we clear here. */
6207 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6208 list, and no more device_writes should add lkb's to proc->locks list; so we
6209 shouldn't need to take asts_spin or locks_spin here. this assumes that
6210 device reads/writes/closes are serialized -- FIXME: we may need to serialize
6213 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6215 struct dlm_lkb *lkb, *safe;
6217 dlm_lock_recovery(ls);
6220 lkb = del_proc_lock(ls, proc);
6224 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6225 orphan_proc_lock(ls, lkb);
6227 unlock_proc_lock(ls, lkb);
6229 /* this removes the reference for the proc->locks list
6230 added by dlm_user_request, it may result in the lkb
6236 mutex_lock(&ls->ls_clear_proc_locks);
6238 /* in-progress unlocks */
6239 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6240 list_del_init(&lkb->lkb_ownqueue);
6241 lkb->lkb_flags |= DLM_IFL_DEAD;
6245 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6246 memset(&lkb->lkb_callbacks, 0,
6247 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6248 list_del_init(&lkb->lkb_cb_list);
6252 mutex_unlock(&ls->ls_clear_proc_locks);
6253 dlm_unlock_recovery(ls);
6256 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6258 struct dlm_lkb *lkb, *safe;
6262 spin_lock(&proc->locks_spin);
6263 if (!list_empty(&proc->locks)) {
6264 lkb = list_entry(proc->locks.next, struct dlm_lkb,
6266 list_del_init(&lkb->lkb_ownqueue);
6268 spin_unlock(&proc->locks_spin);
6273 lkb->lkb_flags |= DLM_IFL_DEAD;
6274 unlock_proc_lock(ls, lkb);
6275 dlm_put_lkb(lkb); /* ref from proc->locks list */
6278 spin_lock(&proc->locks_spin);
6279 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6280 list_del_init(&lkb->lkb_ownqueue);
6281 lkb->lkb_flags |= DLM_IFL_DEAD;
6284 spin_unlock(&proc->locks_spin);
6286 spin_lock(&proc->asts_spin);
6287 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6288 memset(&lkb->lkb_callbacks, 0,
6289 sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6290 list_del_init(&lkb->lkb_cb_list);
6293 spin_unlock(&proc->asts_spin);
6296 /* pid of 0 means purge all orphans */
6298 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6300 struct dlm_lkb *lkb, *safe;
6302 mutex_lock(&ls->ls_orphans_mutex);
6303 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6304 if (pid && lkb->lkb_ownpid != pid)
6306 unlock_proc_lock(ls, lkb);
6307 list_del_init(&lkb->lkb_ownqueue);
6310 mutex_unlock(&ls->ls_orphans_mutex);
6313 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6315 struct dlm_message *ms;
6316 struct dlm_mhandle *mh;
6319 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6320 DLM_MSG_PURGE, &ms, &mh);
6323 ms->m_nodeid = cpu_to_le32(nodeid);
6324 ms->m_pid = cpu_to_le32(pid);
6326 return send_message(mh, ms);
6329 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6330 int nodeid, int pid)
6334 if (nodeid && (nodeid != dlm_our_nodeid())) {
6335 error = send_purge(ls, nodeid, pid);
6337 dlm_lock_recovery(ls);
6338 if (pid == current->pid)
6339 purge_proc_locks(ls, proc);
6341 do_purge(ls, nodeid, pid);
6342 dlm_unlock_recovery(ls);
6347 /* debug functionality */
6348 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6349 int lkb_nodeid, unsigned int lkb_flags, int lkb_status)
6351 struct dlm_lksb *lksb;
6352 struct dlm_lkb *lkb;
6356 /* we currently can't set a valid user lock */
6357 if (lkb_flags & DLM_IFL_USER)
6360 lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6364 error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6370 lkb->lkb_flags = lkb_flags;
6371 lkb->lkb_nodeid = lkb_nodeid;
6372 lkb->lkb_lksb = lksb;
6373 /* user specific pointer, just don't have it NULL for kernel locks */
6374 if (~lkb_flags & DLM_IFL_USER)
6375 lkb->lkb_astparam = (void *)0xDEADBEEF;
6377 error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6386 add_lkb(r, lkb, lkb_status);
6393 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6394 int mstype, int to_nodeid)
6396 struct dlm_lkb *lkb;
6399 error = find_lkb(ls, lkb_id, &lkb);
6403 error = add_to_waiters(lkb, mstype, to_nodeid);