GNU Linux-libre 5.4.257-gnu1
[releases.git] / fs / dlm / lock.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
6 **
7 **
8 *******************************************************************************
9 ******************************************************************************/
10
11 /* Central locking logic has four stages:
12
13    dlm_lock()
14    dlm_unlock()
15
16    request_lock(ls, lkb)
17    convert_lock(ls, lkb)
18    unlock_lock(ls, lkb)
19    cancel_lock(ls, lkb)
20
21    _request_lock(r, lkb)
22    _convert_lock(r, lkb)
23    _unlock_lock(r, lkb)
24    _cancel_lock(r, lkb)
25
26    do_request(r, lkb)
27    do_convert(r, lkb)
28    do_unlock(r, lkb)
29    do_cancel(r, lkb)
30
31    Stage 1 (lock, unlock) is mainly about checking input args and
32    splitting into one of the four main operations:
33
34        dlm_lock          = request_lock
35        dlm_lock+CONVERT  = convert_lock
36        dlm_unlock        = unlock_lock
37        dlm_unlock+CANCEL = cancel_lock
38
39    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40    provided to the next stage.
41
42    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43    When remote, it calls send_xxxx(), when local it calls do_xxxx().
44
45    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
46    given rsb and lkb and queues callbacks.
47
48    For remote operations, send_xxxx() results in the corresponding do_xxxx()
49    function being executed on the remote node.  The connecting send/receive
50    calls on local (L) and remote (R) nodes:
51
52    L: send_xxxx()              ->  R: receive_xxxx()
53                                    R: do_xxxx()
54    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
55 */
56 #include <linux/types.h>
57 #include <linux/rbtree.h>
58 #include <linux/slab.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87                                     struct dlm_message *ms);
88 static int receive_extralen(struct dlm_message *ms);
89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90 static void del_timeout(struct dlm_lkb *lkb);
91 static void toss_rsb(struct kref *kref);
92
93 /*
94  * Lock compatibilty matrix - thanks Steve
95  * UN = Unlocked state. Not really a state, used as a flag
96  * PD = Padding. Used to make the matrix a nice power of two in size
97  * Other states are the same as the VMS DLM.
98  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
99  */
100
101 static const int __dlm_compat_matrix[8][8] = {
102       /* UN NL CR CW PR PW EX PD */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
105         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
106         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
107         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
108         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
109         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
110         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
111 };
112
113 /*
114  * This defines the direction of transfer of LVB data.
115  * Granted mode is the row; requested mode is the column.
116  * Usage: matrix[grmode+1][rqmode+1]
117  * 1 = LVB is returned to the caller
118  * 0 = LVB is written to the resource
119  * -1 = nothing happens to the LVB
120  */
121
122 const int dlm_lvb_operations[8][8] = {
123         /* UN   NL  CR  CW  PR  PW  EX  PD*/
124         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
125         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
126         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
127         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
128         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
129         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
132 };
133
134 #define modes_compat(gr, rq) \
135         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
136
137 int dlm_modes_compat(int mode1, int mode2)
138 {
139         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
140 }
141
142 /*
143  * Compatibility matrix for conversions with QUECVT set.
144  * Granted mode is the row; requested mode is the column.
145  * Usage: matrix[grmode+1][rqmode+1]
146  */
147
148 static const int __quecvt_compat_matrix[8][8] = {
149       /* UN NL CR CW PR PW EX PD */
150         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
151         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
152         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
153         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
154         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
155         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
156         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
157         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
158 };
159
160 void dlm_print_lkb(struct dlm_lkb *lkb)
161 {
162         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
163                "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
164                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
165                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
166                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
167                (unsigned long long)lkb->lkb_recover_seq);
168 }
169
170 static void dlm_print_rsb(struct dlm_rsb *r)
171 {
172         printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
173                "rlc %d name %s\n",
174                r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
175                r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
176                r->res_name);
177 }
178
179 void dlm_dump_rsb(struct dlm_rsb *r)
180 {
181         struct dlm_lkb *lkb;
182
183         dlm_print_rsb(r);
184
185         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
186                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
187         printk(KERN_ERR "rsb lookup list\n");
188         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
189                 dlm_print_lkb(lkb);
190         printk(KERN_ERR "rsb grant queue:\n");
191         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
192                 dlm_print_lkb(lkb);
193         printk(KERN_ERR "rsb convert queue:\n");
194         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
195                 dlm_print_lkb(lkb);
196         printk(KERN_ERR "rsb wait queue:\n");
197         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
198                 dlm_print_lkb(lkb);
199 }
200
201 /* Threads cannot use the lockspace while it's being recovered */
202
203 static inline void dlm_lock_recovery(struct dlm_ls *ls)
204 {
205         down_read(&ls->ls_in_recovery);
206 }
207
208 void dlm_unlock_recovery(struct dlm_ls *ls)
209 {
210         up_read(&ls->ls_in_recovery);
211 }
212
213 int dlm_lock_recovery_try(struct dlm_ls *ls)
214 {
215         return down_read_trylock(&ls->ls_in_recovery);
216 }
217
218 static inline int can_be_queued(struct dlm_lkb *lkb)
219 {
220         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
221 }
222
223 static inline int force_blocking_asts(struct dlm_lkb *lkb)
224 {
225         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
226 }
227
228 static inline int is_demoted(struct dlm_lkb *lkb)
229 {
230         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
231 }
232
233 static inline int is_altmode(struct dlm_lkb *lkb)
234 {
235         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
236 }
237
238 static inline int is_granted(struct dlm_lkb *lkb)
239 {
240         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
241 }
242
243 static inline int is_remote(struct dlm_rsb *r)
244 {
245         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
246         return !!r->res_nodeid;
247 }
248
249 static inline int is_process_copy(struct dlm_lkb *lkb)
250 {
251         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
252 }
253
254 static inline int is_master_copy(struct dlm_lkb *lkb)
255 {
256         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
257 }
258
259 static inline int middle_conversion(struct dlm_lkb *lkb)
260 {
261         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
262             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
263                 return 1;
264         return 0;
265 }
266
267 static inline int down_conversion(struct dlm_lkb *lkb)
268 {
269         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
270 }
271
272 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
273 {
274         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
275 }
276
277 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
278 {
279         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
280 }
281
282 static inline int is_overlap(struct dlm_lkb *lkb)
283 {
284         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
285                                   DLM_IFL_OVERLAP_CANCEL));
286 }
287
288 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
289 {
290         if (is_master_copy(lkb))
291                 return;
292
293         del_timeout(lkb);
294
295         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296
297         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
298            timeout caused the cancel then return -ETIMEDOUT */
299         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
300                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
301                 rv = -ETIMEDOUT;
302         }
303
304         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
305                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
306                 rv = -EDEADLK;
307         }
308
309         dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
310 }
311
312 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
313 {
314         queue_cast(r, lkb,
315                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
316 }
317
318 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
319 {
320         if (is_master_copy(lkb)) {
321                 send_bast(r, lkb, rqmode);
322         } else {
323                 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
324         }
325 }
326
327 /*
328  * Basic operations on rsb's and lkb's
329  */
330
331 /* This is only called to add a reference when the code already holds
332    a valid reference to the rsb, so there's no need for locking. */
333
334 static inline void hold_rsb(struct dlm_rsb *r)
335 {
336         kref_get(&r->res_ref);
337 }
338
339 void dlm_hold_rsb(struct dlm_rsb *r)
340 {
341         hold_rsb(r);
342 }
343
344 /* When all references to the rsb are gone it's transferred to
345    the tossed list for later disposal. */
346
347 static void put_rsb(struct dlm_rsb *r)
348 {
349         struct dlm_ls *ls = r->res_ls;
350         uint32_t bucket = r->res_bucket;
351
352         spin_lock(&ls->ls_rsbtbl[bucket].lock);
353         kref_put(&r->res_ref, toss_rsb);
354         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
355 }
356
357 void dlm_put_rsb(struct dlm_rsb *r)
358 {
359         put_rsb(r);
360 }
361
362 static int pre_rsb_struct(struct dlm_ls *ls)
363 {
364         struct dlm_rsb *r1, *r2;
365         int count = 0;
366
367         spin_lock(&ls->ls_new_rsb_spin);
368         if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
369                 spin_unlock(&ls->ls_new_rsb_spin);
370                 return 0;
371         }
372         spin_unlock(&ls->ls_new_rsb_spin);
373
374         r1 = dlm_allocate_rsb(ls);
375         r2 = dlm_allocate_rsb(ls);
376
377         spin_lock(&ls->ls_new_rsb_spin);
378         if (r1) {
379                 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
380                 ls->ls_new_rsb_count++;
381         }
382         if (r2) {
383                 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
384                 ls->ls_new_rsb_count++;
385         }
386         count = ls->ls_new_rsb_count;
387         spin_unlock(&ls->ls_new_rsb_spin);
388
389         if (!count)
390                 return -ENOMEM;
391         return 0;
392 }
393
394 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
395    unlock any spinlocks, go back and call pre_rsb_struct again.
396    Otherwise, take an rsb off the list and return it. */
397
398 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
399                           struct dlm_rsb **r_ret)
400 {
401         struct dlm_rsb *r;
402         int count;
403
404         spin_lock(&ls->ls_new_rsb_spin);
405         if (list_empty(&ls->ls_new_rsb)) {
406                 count = ls->ls_new_rsb_count;
407                 spin_unlock(&ls->ls_new_rsb_spin);
408                 log_debug(ls, "find_rsb retry %d %d %s",
409                           count, dlm_config.ci_new_rsb_count, name);
410                 return -EAGAIN;
411         }
412
413         r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
414         list_del(&r->res_hashchain);
415         /* Convert the empty list_head to a NULL rb_node for tree usage: */
416         memset(&r->res_hashnode, 0, sizeof(struct rb_node));
417         ls->ls_new_rsb_count--;
418         spin_unlock(&ls->ls_new_rsb_spin);
419
420         r->res_ls = ls;
421         r->res_length = len;
422         memcpy(r->res_name, name, len);
423         mutex_init(&r->res_mutex);
424
425         INIT_LIST_HEAD(&r->res_lookup);
426         INIT_LIST_HEAD(&r->res_grantqueue);
427         INIT_LIST_HEAD(&r->res_convertqueue);
428         INIT_LIST_HEAD(&r->res_waitqueue);
429         INIT_LIST_HEAD(&r->res_root_list);
430         INIT_LIST_HEAD(&r->res_recover_list);
431
432         *r_ret = r;
433         return 0;
434 }
435
436 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
437 {
438         char maxname[DLM_RESNAME_MAXLEN];
439
440         memset(maxname, 0, DLM_RESNAME_MAXLEN);
441         memcpy(maxname, name, nlen);
442         return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
443 }
444
445 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
446                         struct dlm_rsb **r_ret)
447 {
448         struct rb_node *node = tree->rb_node;
449         struct dlm_rsb *r;
450         int rc;
451
452         while (node) {
453                 r = rb_entry(node, struct dlm_rsb, res_hashnode);
454                 rc = rsb_cmp(r, name, len);
455                 if (rc < 0)
456                         node = node->rb_left;
457                 else if (rc > 0)
458                         node = node->rb_right;
459                 else
460                         goto found;
461         }
462         *r_ret = NULL;
463         return -EBADR;
464
465  found:
466         *r_ret = r;
467         return 0;
468 }
469
470 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
471 {
472         struct rb_node **newn = &tree->rb_node;
473         struct rb_node *parent = NULL;
474         int rc;
475
476         while (*newn) {
477                 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
478                                                res_hashnode);
479
480                 parent = *newn;
481                 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
482                 if (rc < 0)
483                         newn = &parent->rb_left;
484                 else if (rc > 0)
485                         newn = &parent->rb_right;
486                 else {
487                         log_print("rsb_insert match");
488                         dlm_dump_rsb(rsb);
489                         dlm_dump_rsb(cur);
490                         return -EEXIST;
491                 }
492         }
493
494         rb_link_node(&rsb->res_hashnode, parent, newn);
495         rb_insert_color(&rsb->res_hashnode, tree);
496         return 0;
497 }
498
499 /*
500  * Find rsb in rsbtbl and potentially create/add one
501  *
502  * Delaying the release of rsb's has a similar benefit to applications keeping
503  * NL locks on an rsb, but without the guarantee that the cached master value
504  * will still be valid when the rsb is reused.  Apps aren't always smart enough
505  * to keep NL locks on an rsb that they may lock again shortly; this can lead
506  * to excessive master lookups and removals if we don't delay the release.
507  *
508  * Searching for an rsb means looking through both the normal list and toss
509  * list.  When found on the toss list the rsb is moved to the normal list with
510  * ref count of 1; when found on normal list the ref count is incremented.
511  *
512  * rsb's on the keep list are being used locally and refcounted.
513  * rsb's on the toss list are not being used locally, and are not refcounted.
514  *
515  * The toss list rsb's were either
516  * - previously used locally but not any more (were on keep list, then
517  *   moved to toss list when last refcount dropped)
518  * - created and put on toss list as a directory record for a lookup
519  *   (we are the dir node for the res, but are not using the res right now,
520  *   but some other node is)
521  *
522  * The purpose of find_rsb() is to return a refcounted rsb for local use.
523  * So, if the given rsb is on the toss list, it is moved to the keep list
524  * before being returned.
525  *
526  * toss_rsb() happens when all local usage of the rsb is done, i.e. no
527  * more refcounts exist, so the rsb is moved from the keep list to the
528  * toss list.
529  *
530  * rsb's on both keep and toss lists are used for doing a name to master
531  * lookups.  rsb's that are in use locally (and being refcounted) are on
532  * the keep list, rsb's that are not in use locally (not refcounted) and
533  * only exist for name/master lookups are on the toss list.
534  *
535  * rsb's on the toss list who's dir_nodeid is not local can have stale
536  * name/master mappings.  So, remote requests on such rsb's can potentially
537  * return with an error, which means the mapping is stale and needs to
538  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
539  * first_lkid is to keep only a single outstanding request on an rsb
540  * while that rsb has a potentially stale master.)
541  */
542
543 static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
544                         uint32_t hash, uint32_t b,
545                         int dir_nodeid, int from_nodeid,
546                         unsigned int flags, struct dlm_rsb **r_ret)
547 {
548         struct dlm_rsb *r = NULL;
549         int our_nodeid = dlm_our_nodeid();
550         int from_local = 0;
551         int from_other = 0;
552         int from_dir = 0;
553         int create = 0;
554         int error;
555
556         if (flags & R_RECEIVE_REQUEST) {
557                 if (from_nodeid == dir_nodeid)
558                         from_dir = 1;
559                 else
560                         from_other = 1;
561         } else if (flags & R_REQUEST) {
562                 from_local = 1;
563         }
564
565         /*
566          * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
567          * from_nodeid has sent us a lock in dlm_recover_locks, believing
568          * we're the new master.  Our local recovery may not have set
569          * res_master_nodeid to our_nodeid yet, so allow either.  Don't
570          * create the rsb; dlm_recover_process_copy() will handle EBADR
571          * by resending.
572          *
573          * If someone sends us a request, we are the dir node, and we do
574          * not find the rsb anywhere, then recreate it.  This happens if
575          * someone sends us a request after we have removed/freed an rsb
576          * from our toss list.  (They sent a request instead of lookup
577          * because they are using an rsb from their toss list.)
578          */
579
580         if (from_local || from_dir ||
581             (from_other && (dir_nodeid == our_nodeid))) {
582                 create = 1;
583         }
584
585  retry:
586         if (create) {
587                 error = pre_rsb_struct(ls);
588                 if (error < 0)
589                         goto out;
590         }
591
592         spin_lock(&ls->ls_rsbtbl[b].lock);
593
594         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
595         if (error)
596                 goto do_toss;
597         
598         /*
599          * rsb is active, so we can't check master_nodeid without lock_rsb.
600          */
601
602         kref_get(&r->res_ref);
603         error = 0;
604         goto out_unlock;
605
606
607  do_toss:
608         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
609         if (error)
610                 goto do_new;
611
612         /*
613          * rsb found inactive (master_nodeid may be out of date unless
614          * we are the dir_nodeid or were the master)  No other thread
615          * is using this rsb because it's on the toss list, so we can
616          * look at or update res_master_nodeid without lock_rsb.
617          */
618
619         if ((r->res_master_nodeid != our_nodeid) && from_other) {
620                 /* our rsb was not master, and another node (not the dir node)
621                    has sent us a request */
622                 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
623                           from_nodeid, r->res_master_nodeid, dir_nodeid,
624                           r->res_name);
625                 error = -ENOTBLK;
626                 goto out_unlock;
627         }
628
629         if ((r->res_master_nodeid != our_nodeid) && from_dir) {
630                 /* don't think this should ever happen */
631                 log_error(ls, "find_rsb toss from_dir %d master %d",
632                           from_nodeid, r->res_master_nodeid);
633                 dlm_print_rsb(r);
634                 /* fix it and go on */
635                 r->res_master_nodeid = our_nodeid;
636                 r->res_nodeid = 0;
637                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
638                 r->res_first_lkid = 0;
639         }
640
641         if (from_local && (r->res_master_nodeid != our_nodeid)) {
642                 /* Because we have held no locks on this rsb,
643                    res_master_nodeid could have become stale. */
644                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
645                 r->res_first_lkid = 0;
646         }
647
648         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
649         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
650         goto out_unlock;
651
652
653  do_new:
654         /*
655          * rsb not found
656          */
657
658         if (error == -EBADR && !create)
659                 goto out_unlock;
660
661         error = get_rsb_struct(ls, name, len, &r);
662         if (error == -EAGAIN) {
663                 spin_unlock(&ls->ls_rsbtbl[b].lock);
664                 goto retry;
665         }
666         if (error)
667                 goto out_unlock;
668
669         r->res_hash = hash;
670         r->res_bucket = b;
671         r->res_dir_nodeid = dir_nodeid;
672         kref_init(&r->res_ref);
673
674         if (from_dir) {
675                 /* want to see how often this happens */
676                 log_debug(ls, "find_rsb new from_dir %d recreate %s",
677                           from_nodeid, r->res_name);
678                 r->res_master_nodeid = our_nodeid;
679                 r->res_nodeid = 0;
680                 goto out_add;
681         }
682
683         if (from_other && (dir_nodeid != our_nodeid)) {
684                 /* should never happen */
685                 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
686                           from_nodeid, dir_nodeid, our_nodeid, r->res_name);
687                 dlm_free_rsb(r);
688                 r = NULL;
689                 error = -ENOTBLK;
690                 goto out_unlock;
691         }
692
693         if (from_other) {
694                 log_debug(ls, "find_rsb new from_other %d dir %d %s",
695                           from_nodeid, dir_nodeid, r->res_name);
696         }
697
698         if (dir_nodeid == our_nodeid) {
699                 /* When we are the dir nodeid, we can set the master
700                    node immediately */
701                 r->res_master_nodeid = our_nodeid;
702                 r->res_nodeid = 0;
703         } else {
704                 /* set_master will send_lookup to dir_nodeid */
705                 r->res_master_nodeid = 0;
706                 r->res_nodeid = -1;
707         }
708
709  out_add:
710         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
711  out_unlock:
712         spin_unlock(&ls->ls_rsbtbl[b].lock);
713  out:
714         *r_ret = r;
715         return error;
716 }
717
718 /* During recovery, other nodes can send us new MSTCPY locks (from
719    dlm_recover_locks) before we've made ourself master (in
720    dlm_recover_masters). */
721
722 static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
723                           uint32_t hash, uint32_t b,
724                           int dir_nodeid, int from_nodeid,
725                           unsigned int flags, struct dlm_rsb **r_ret)
726 {
727         struct dlm_rsb *r = NULL;
728         int our_nodeid = dlm_our_nodeid();
729         int recover = (flags & R_RECEIVE_RECOVER);
730         int error;
731
732  retry:
733         error = pre_rsb_struct(ls);
734         if (error < 0)
735                 goto out;
736
737         spin_lock(&ls->ls_rsbtbl[b].lock);
738
739         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
740         if (error)
741                 goto do_toss;
742
743         /*
744          * rsb is active, so we can't check master_nodeid without lock_rsb.
745          */
746
747         kref_get(&r->res_ref);
748         goto out_unlock;
749
750
751  do_toss:
752         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
753         if (error)
754                 goto do_new;
755
756         /*
757          * rsb found inactive. No other thread is using this rsb because
758          * it's on the toss list, so we can look at or update
759          * res_master_nodeid without lock_rsb.
760          */
761
762         if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
763                 /* our rsb is not master, and another node has sent us a
764                    request; this should never happen */
765                 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
766                           from_nodeid, r->res_master_nodeid, dir_nodeid);
767                 dlm_print_rsb(r);
768                 error = -ENOTBLK;
769                 goto out_unlock;
770         }
771
772         if (!recover && (r->res_master_nodeid != our_nodeid) &&
773             (dir_nodeid == our_nodeid)) {
774                 /* our rsb is not master, and we are dir; may as well fix it;
775                    this should never happen */
776                 log_error(ls, "find_rsb toss our %d master %d dir %d",
777                           our_nodeid, r->res_master_nodeid, dir_nodeid);
778                 dlm_print_rsb(r);
779                 r->res_master_nodeid = our_nodeid;
780                 r->res_nodeid = 0;
781         }
782
783         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
784         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
785         goto out_unlock;
786
787
788  do_new:
789         /*
790          * rsb not found
791          */
792
793         error = get_rsb_struct(ls, name, len, &r);
794         if (error == -EAGAIN) {
795                 spin_unlock(&ls->ls_rsbtbl[b].lock);
796                 goto retry;
797         }
798         if (error)
799                 goto out_unlock;
800
801         r->res_hash = hash;
802         r->res_bucket = b;
803         r->res_dir_nodeid = dir_nodeid;
804         r->res_master_nodeid = dir_nodeid;
805         r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
806         kref_init(&r->res_ref);
807
808         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
809  out_unlock:
810         spin_unlock(&ls->ls_rsbtbl[b].lock);
811  out:
812         *r_ret = r;
813         return error;
814 }
815
816 static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
817                     unsigned int flags, struct dlm_rsb **r_ret)
818 {
819         uint32_t hash, b;
820         int dir_nodeid;
821
822         if (len > DLM_RESNAME_MAXLEN)
823                 return -EINVAL;
824
825         hash = jhash(name, len, 0);
826         b = hash & (ls->ls_rsbtbl_size - 1);
827
828         dir_nodeid = dlm_hash2nodeid(ls, hash);
829
830         if (dlm_no_directory(ls))
831                 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
832                                       from_nodeid, flags, r_ret);
833         else
834                 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
835                                       from_nodeid, flags, r_ret);
836 }
837
838 /* we have received a request and found that res_master_nodeid != our_nodeid,
839    so we need to return an error or make ourself the master */
840
841 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
842                                   int from_nodeid)
843 {
844         if (dlm_no_directory(ls)) {
845                 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
846                           from_nodeid, r->res_master_nodeid,
847                           r->res_dir_nodeid);
848                 dlm_print_rsb(r);
849                 return -ENOTBLK;
850         }
851
852         if (from_nodeid != r->res_dir_nodeid) {
853                 /* our rsb is not master, and another node (not the dir node)
854                    has sent us a request.  this is much more common when our
855                    master_nodeid is zero, so limit debug to non-zero.  */
856
857                 if (r->res_master_nodeid) {
858                         log_debug(ls, "validate master from_other %d master %d "
859                                   "dir %d first %x %s", from_nodeid,
860                                   r->res_master_nodeid, r->res_dir_nodeid,
861                                   r->res_first_lkid, r->res_name);
862                 }
863                 return -ENOTBLK;
864         } else {
865                 /* our rsb is not master, but the dir nodeid has sent us a
866                    request; this could happen with master 0 / res_nodeid -1 */
867
868                 if (r->res_master_nodeid) {
869                         log_error(ls, "validate master from_dir %d master %d "
870                                   "first %x %s",
871                                   from_nodeid, r->res_master_nodeid,
872                                   r->res_first_lkid, r->res_name);
873                 }
874
875                 r->res_master_nodeid = dlm_our_nodeid();
876                 r->res_nodeid = 0;
877                 return 0;
878         }
879 }
880
881 /*
882  * We're the dir node for this res and another node wants to know the
883  * master nodeid.  During normal operation (non recovery) this is only
884  * called from receive_lookup(); master lookups when the local node is
885  * the dir node are done by find_rsb().
886  *
887  * normal operation, we are the dir node for a resource
888  * . _request_lock
889  * . set_master
890  * . send_lookup
891  * . receive_lookup
892  * . dlm_master_lookup flags 0
893  *
894  * recover directory, we are rebuilding dir for all resources
895  * . dlm_recover_directory
896  * . dlm_rcom_names
897  *   remote node sends back the rsb names it is master of and we are dir of
898  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
899  *   we either create new rsb setting remote node as master, or find existing
900  *   rsb and set master to be the remote node.
901  *
902  * recover masters, we are finding the new master for resources
903  * . dlm_recover_masters
904  * . recover_master
905  * . dlm_send_rcom_lookup
906  * . receive_rcom_lookup
907  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
908  */
909
910 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
911                       unsigned int flags, int *r_nodeid, int *result)
912 {
913         struct dlm_rsb *r = NULL;
914         uint32_t hash, b;
915         int from_master = (flags & DLM_LU_RECOVER_DIR);
916         int fix_master = (flags & DLM_LU_RECOVER_MASTER);
917         int our_nodeid = dlm_our_nodeid();
918         int dir_nodeid, error, toss_list = 0;
919
920         if (len > DLM_RESNAME_MAXLEN)
921                 return -EINVAL;
922
923         if (from_nodeid == our_nodeid) {
924                 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
925                           our_nodeid, flags);
926                 return -EINVAL;
927         }
928
929         hash = jhash(name, len, 0);
930         b = hash & (ls->ls_rsbtbl_size - 1);
931
932         dir_nodeid = dlm_hash2nodeid(ls, hash);
933         if (dir_nodeid != our_nodeid) {
934                 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
935                           from_nodeid, dir_nodeid, our_nodeid, hash,
936                           ls->ls_num_nodes);
937                 *r_nodeid = -1;
938                 return -EINVAL;
939         }
940
941  retry:
942         error = pre_rsb_struct(ls);
943         if (error < 0)
944                 return error;
945
946         spin_lock(&ls->ls_rsbtbl[b].lock);
947         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
948         if (!error) {
949                 /* because the rsb is active, we need to lock_rsb before
950                    checking/changing re_master_nodeid */
951
952                 hold_rsb(r);
953                 spin_unlock(&ls->ls_rsbtbl[b].lock);
954                 lock_rsb(r);
955                 goto found;
956         }
957
958         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
959         if (error)
960                 goto not_found;
961
962         /* because the rsb is inactive (on toss list), it's not refcounted
963            and lock_rsb is not used, but is protected by the rsbtbl lock */
964
965         toss_list = 1;
966  found:
967         if (r->res_dir_nodeid != our_nodeid) {
968                 /* should not happen, but may as well fix it and carry on */
969                 log_error(ls, "dlm_master_lookup res_dir %d our %d %s",
970                           r->res_dir_nodeid, our_nodeid, r->res_name);
971                 r->res_dir_nodeid = our_nodeid;
972         }
973
974         if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
975                 /* Recovery uses this function to set a new master when
976                    the previous master failed.  Setting NEW_MASTER will
977                    force dlm_recover_masters to call recover_master on this
978                    rsb even though the res_nodeid is no longer removed. */
979
980                 r->res_master_nodeid = from_nodeid;
981                 r->res_nodeid = from_nodeid;
982                 rsb_set_flag(r, RSB_NEW_MASTER);
983
984                 if (toss_list) {
985                         /* I don't think we should ever find it on toss list. */
986                         log_error(ls, "dlm_master_lookup fix_master on toss");
987                         dlm_dump_rsb(r);
988                 }
989         }
990
991         if (from_master && (r->res_master_nodeid != from_nodeid)) {
992                 /* this will happen if from_nodeid became master during
993                    a previous recovery cycle, and we aborted the previous
994                    cycle before recovering this master value */
995
996                 log_limit(ls, "dlm_master_lookup from_master %d "
997                           "master_nodeid %d res_nodeid %d first %x %s",
998                           from_nodeid, r->res_master_nodeid, r->res_nodeid,
999                           r->res_first_lkid, r->res_name);
1000
1001                 if (r->res_master_nodeid == our_nodeid) {
1002                         log_error(ls, "from_master %d our_master", from_nodeid);
1003                         dlm_dump_rsb(r);
1004                         goto out_found;
1005                 }
1006
1007                 r->res_master_nodeid = from_nodeid;
1008                 r->res_nodeid = from_nodeid;
1009                 rsb_set_flag(r, RSB_NEW_MASTER);
1010         }
1011
1012         if (!r->res_master_nodeid) {
1013                 /* this will happen if recovery happens while we're looking
1014                    up the master for this rsb */
1015
1016                 log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s",
1017                           from_nodeid, r->res_first_lkid, r->res_name);
1018                 r->res_master_nodeid = from_nodeid;
1019                 r->res_nodeid = from_nodeid;
1020         }
1021
1022         if (!from_master && !fix_master &&
1023             (r->res_master_nodeid == from_nodeid)) {
1024                 /* this can happen when the master sends remove, the dir node
1025                    finds the rsb on the keep list and ignores the remove,
1026                    and the former master sends a lookup */
1027
1028                 log_limit(ls, "dlm_master_lookup from master %d flags %x "
1029                           "first %x %s", from_nodeid, flags,
1030                           r->res_first_lkid, r->res_name);
1031         }
1032
1033  out_found:
1034         *r_nodeid = r->res_master_nodeid;
1035         if (result)
1036                 *result = DLM_LU_MATCH;
1037
1038         if (toss_list) {
1039                 r->res_toss_time = jiffies;
1040                 /* the rsb was inactive (on toss list) */
1041                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1042         } else {
1043                 /* the rsb was active */
1044                 unlock_rsb(r);
1045                 put_rsb(r);
1046         }
1047         return 0;
1048
1049  not_found:
1050         error = get_rsb_struct(ls, name, len, &r);
1051         if (error == -EAGAIN) {
1052                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1053                 goto retry;
1054         }
1055         if (error)
1056                 goto out_unlock;
1057
1058         r->res_hash = hash;
1059         r->res_bucket = b;
1060         r->res_dir_nodeid = our_nodeid;
1061         r->res_master_nodeid = from_nodeid;
1062         r->res_nodeid = from_nodeid;
1063         kref_init(&r->res_ref);
1064         r->res_toss_time = jiffies;
1065
1066         error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1067         if (error) {
1068                 /* should never happen */
1069                 dlm_free_rsb(r);
1070                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1071                 goto retry;
1072         }
1073
1074         if (result)
1075                 *result = DLM_LU_ADD;
1076         *r_nodeid = from_nodeid;
1077         error = 0;
1078  out_unlock:
1079         spin_unlock(&ls->ls_rsbtbl[b].lock);
1080         return error;
1081 }
1082
1083 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1084 {
1085         struct rb_node *n;
1086         struct dlm_rsb *r;
1087         int i;
1088
1089         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1090                 spin_lock(&ls->ls_rsbtbl[i].lock);
1091                 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1092                         r = rb_entry(n, struct dlm_rsb, res_hashnode);
1093                         if (r->res_hash == hash)
1094                                 dlm_dump_rsb(r);
1095                 }
1096                 spin_unlock(&ls->ls_rsbtbl[i].lock);
1097         }
1098 }
1099
1100 void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
1101 {
1102         struct dlm_rsb *r = NULL;
1103         uint32_t hash, b;
1104         int error;
1105
1106         hash = jhash(name, len, 0);
1107         b = hash & (ls->ls_rsbtbl_size - 1);
1108
1109         spin_lock(&ls->ls_rsbtbl[b].lock);
1110         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1111         if (!error)
1112                 goto out_dump;
1113
1114         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1115         if (error)
1116                 goto out;
1117  out_dump:
1118         dlm_dump_rsb(r);
1119  out:
1120         spin_unlock(&ls->ls_rsbtbl[b].lock);
1121 }
1122
1123 static void toss_rsb(struct kref *kref)
1124 {
1125         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1126         struct dlm_ls *ls = r->res_ls;
1127
1128         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1129         kref_init(&r->res_ref);
1130         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1131         rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1132         r->res_toss_time = jiffies;
1133         ls->ls_rsbtbl[r->res_bucket].flags |= DLM_RTF_SHRINK;
1134         if (r->res_lvbptr) {
1135                 dlm_free_lvb(r->res_lvbptr);
1136                 r->res_lvbptr = NULL;
1137         }
1138 }
1139
1140 /* See comment for unhold_lkb */
1141
1142 static void unhold_rsb(struct dlm_rsb *r)
1143 {
1144         int rv;
1145         rv = kref_put(&r->res_ref, toss_rsb);
1146         DLM_ASSERT(!rv, dlm_dump_rsb(r););
1147 }
1148
1149 static void kill_rsb(struct kref *kref)
1150 {
1151         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1152
1153         /* All work is done after the return from kref_put() so we
1154            can release the write_lock before the remove and free. */
1155
1156         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1157         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1158         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1159         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1160         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1161         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1162 }
1163
1164 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1165    The rsb must exist as long as any lkb's for it do. */
1166
1167 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1168 {
1169         hold_rsb(r);
1170         lkb->lkb_resource = r;
1171 }
1172
1173 static void detach_lkb(struct dlm_lkb *lkb)
1174 {
1175         if (lkb->lkb_resource) {
1176                 put_rsb(lkb->lkb_resource);
1177                 lkb->lkb_resource = NULL;
1178         }
1179 }
1180
1181 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1182 {
1183         struct dlm_lkb *lkb;
1184         int rv;
1185
1186         lkb = dlm_allocate_lkb(ls);
1187         if (!lkb)
1188                 return -ENOMEM;
1189
1190         lkb->lkb_nodeid = -1;
1191         lkb->lkb_grmode = DLM_LOCK_IV;
1192         kref_init(&lkb->lkb_ref);
1193         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1194         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1195         INIT_LIST_HEAD(&lkb->lkb_time_list);
1196         INIT_LIST_HEAD(&lkb->lkb_cb_list);
1197         mutex_init(&lkb->lkb_cb_mutex);
1198         INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1199
1200         idr_preload(GFP_NOFS);
1201         spin_lock(&ls->ls_lkbidr_spin);
1202         rv = idr_alloc(&ls->ls_lkbidr, lkb, 1, 0, GFP_NOWAIT);
1203         if (rv >= 0)
1204                 lkb->lkb_id = rv;
1205         spin_unlock(&ls->ls_lkbidr_spin);
1206         idr_preload_end();
1207
1208         if (rv < 0) {
1209                 log_error(ls, "create_lkb idr error %d", rv);
1210                 dlm_free_lkb(lkb);
1211                 return rv;
1212         }
1213
1214         *lkb_ret = lkb;
1215         return 0;
1216 }
1217
1218 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1219 {
1220         struct dlm_lkb *lkb;
1221
1222         spin_lock(&ls->ls_lkbidr_spin);
1223         lkb = idr_find(&ls->ls_lkbidr, lkid);
1224         if (lkb)
1225                 kref_get(&lkb->lkb_ref);
1226         spin_unlock(&ls->ls_lkbidr_spin);
1227
1228         *lkb_ret = lkb;
1229         return lkb ? 0 : -ENOENT;
1230 }
1231
1232 static void kill_lkb(struct kref *kref)
1233 {
1234         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1235
1236         /* All work is done after the return from kref_put() so we
1237            can release the write_lock before the detach_lkb */
1238
1239         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1240 }
1241
1242 /* __put_lkb() is used when an lkb may not have an rsb attached to
1243    it so we need to provide the lockspace explicitly */
1244
1245 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1246 {
1247         uint32_t lkid = lkb->lkb_id;
1248
1249         spin_lock(&ls->ls_lkbidr_spin);
1250         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
1251                 idr_remove(&ls->ls_lkbidr, lkid);
1252                 spin_unlock(&ls->ls_lkbidr_spin);
1253
1254                 detach_lkb(lkb);
1255
1256                 /* for local/process lkbs, lvbptr points to caller's lksb */
1257                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1258                         dlm_free_lvb(lkb->lkb_lvbptr);
1259                 dlm_free_lkb(lkb);
1260                 return 1;
1261         } else {
1262                 spin_unlock(&ls->ls_lkbidr_spin);
1263                 return 0;
1264         }
1265 }
1266
1267 int dlm_put_lkb(struct dlm_lkb *lkb)
1268 {
1269         struct dlm_ls *ls;
1270
1271         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1272         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1273
1274         ls = lkb->lkb_resource->res_ls;
1275         return __put_lkb(ls, lkb);
1276 }
1277
1278 /* This is only called to add a reference when the code already holds
1279    a valid reference to the lkb, so there's no need for locking. */
1280
1281 static inline void hold_lkb(struct dlm_lkb *lkb)
1282 {
1283         kref_get(&lkb->lkb_ref);
1284 }
1285
1286 /* This is called when we need to remove a reference and are certain
1287    it's not the last ref.  e.g. del_lkb is always called between a
1288    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1289    put_lkb would work fine, but would involve unnecessary locking */
1290
1291 static inline void unhold_lkb(struct dlm_lkb *lkb)
1292 {
1293         int rv;
1294         rv = kref_put(&lkb->lkb_ref, kill_lkb);
1295         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
1296 }
1297
1298 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1299                             int mode)
1300 {
1301         struct dlm_lkb *lkb = NULL;
1302
1303         list_for_each_entry(lkb, head, lkb_statequeue)
1304                 if (lkb->lkb_rqmode < mode)
1305                         break;
1306
1307         __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
1308 }
1309
1310 /* add/remove lkb to rsb's grant/convert/wait queue */
1311
1312 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1313 {
1314         kref_get(&lkb->lkb_ref);
1315
1316         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1317
1318         lkb->lkb_timestamp = ktime_get();
1319
1320         lkb->lkb_status = status;
1321
1322         switch (status) {
1323         case DLM_LKSTS_WAITING:
1324                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1325                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1326                 else
1327                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1328                 break;
1329         case DLM_LKSTS_GRANTED:
1330                 /* convention says granted locks kept in order of grmode */
1331                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1332                                 lkb->lkb_grmode);
1333                 break;
1334         case DLM_LKSTS_CONVERT:
1335                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1336                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1337                 else
1338                         list_add_tail(&lkb->lkb_statequeue,
1339                                       &r->res_convertqueue);
1340                 break;
1341         default:
1342                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1343         }
1344 }
1345
1346 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1347 {
1348         lkb->lkb_status = 0;
1349         list_del(&lkb->lkb_statequeue);
1350         unhold_lkb(lkb);
1351 }
1352
1353 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1354 {
1355         hold_lkb(lkb);
1356         del_lkb(r, lkb);
1357         add_lkb(r, lkb, sts);
1358         unhold_lkb(lkb);
1359 }
1360
1361 static int msg_reply_type(int mstype)
1362 {
1363         switch (mstype) {
1364         case DLM_MSG_REQUEST:
1365                 return DLM_MSG_REQUEST_REPLY;
1366         case DLM_MSG_CONVERT:
1367                 return DLM_MSG_CONVERT_REPLY;
1368         case DLM_MSG_UNLOCK:
1369                 return DLM_MSG_UNLOCK_REPLY;
1370         case DLM_MSG_CANCEL:
1371                 return DLM_MSG_CANCEL_REPLY;
1372         case DLM_MSG_LOOKUP:
1373                 return DLM_MSG_LOOKUP_REPLY;
1374         }
1375         return -1;
1376 }
1377
1378 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
1379 {
1380         int i;
1381
1382         for (i = 0; i < num_nodes; i++) {
1383                 if (!warned[i]) {
1384                         warned[i] = nodeid;
1385                         return 0;
1386                 }
1387                 if (warned[i] == nodeid)
1388                         return 1;
1389         }
1390         return 0;
1391 }
1392
1393 void dlm_scan_waiters(struct dlm_ls *ls)
1394 {
1395         struct dlm_lkb *lkb;
1396         s64 us;
1397         s64 debug_maxus = 0;
1398         u32 debug_scanned = 0;
1399         u32 debug_expired = 0;
1400         int num_nodes = 0;
1401         int *warned = NULL;
1402
1403         if (!dlm_config.ci_waitwarn_us)
1404                 return;
1405
1406         mutex_lock(&ls->ls_waiters_mutex);
1407
1408         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1409                 if (!lkb->lkb_wait_time)
1410                         continue;
1411
1412                 debug_scanned++;
1413
1414                 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
1415
1416                 if (us < dlm_config.ci_waitwarn_us)
1417                         continue;
1418
1419                 lkb->lkb_wait_time = 0;
1420
1421                 debug_expired++;
1422                 if (us > debug_maxus)
1423                         debug_maxus = us;
1424
1425                 if (!num_nodes) {
1426                         num_nodes = ls->ls_num_nodes;
1427                         warned = kcalloc(num_nodes, sizeof(int), GFP_KERNEL);
1428                 }
1429                 if (!warned)
1430                         continue;
1431                 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
1432                         continue;
1433
1434                 log_error(ls, "waitwarn %x %lld %d us check connection to "
1435                           "node %d", lkb->lkb_id, (long long)us,
1436                           dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
1437         }
1438         mutex_unlock(&ls->ls_waiters_mutex);
1439         kfree(warned);
1440
1441         if (debug_expired)
1442                 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
1443                           debug_scanned, debug_expired,
1444                           dlm_config.ci_waitwarn_us, (long long)debug_maxus);
1445 }
1446
1447 /* add/remove lkb from global waiters list of lkb's waiting for
1448    a reply from a remote node */
1449
1450 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1451 {
1452         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1453         int error = 0;
1454
1455         mutex_lock(&ls->ls_waiters_mutex);
1456
1457         if (is_overlap_unlock(lkb) ||
1458             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1459                 error = -EINVAL;
1460                 goto out;
1461         }
1462
1463         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1464                 switch (mstype) {
1465                 case DLM_MSG_UNLOCK:
1466                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1467                         break;
1468                 case DLM_MSG_CANCEL:
1469                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1470                         break;
1471                 default:
1472                         error = -EBUSY;
1473                         goto out;
1474                 }
1475                 lkb->lkb_wait_count++;
1476                 hold_lkb(lkb);
1477
1478                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1479                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
1480                           lkb->lkb_wait_count, lkb->lkb_flags);
1481                 goto out;
1482         }
1483
1484         DLM_ASSERT(!lkb->lkb_wait_count,
1485                    dlm_print_lkb(lkb);
1486                    printk("wait_count %d\n", lkb->lkb_wait_count););
1487
1488         lkb->lkb_wait_count++;
1489         lkb->lkb_wait_type = mstype;
1490         lkb->lkb_wait_time = ktime_get();
1491         lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1492         hold_lkb(lkb);
1493         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1494  out:
1495         if (error)
1496                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
1497                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
1498                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1499         mutex_unlock(&ls->ls_waiters_mutex);
1500         return error;
1501 }
1502
1503 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1504    list as part of process_requestqueue (e.g. a lookup that has an optimized
1505    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1506    set RESEND and dlm_recover_waiters_post() */
1507
1508 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1509                                 struct dlm_message *ms)
1510 {
1511         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1512         int overlap_done = 0;
1513
1514         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1515                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1516                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1517                 overlap_done = 1;
1518                 goto out_del;
1519         }
1520
1521         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1522                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1523                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1524                 overlap_done = 1;
1525                 goto out_del;
1526         }
1527
1528         /* Cancel state was preemptively cleared by a successful convert,
1529            see next comment, nothing to do. */
1530
1531         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1532             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1533                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1534                           lkb->lkb_id, lkb->lkb_wait_type);
1535                 return -1;
1536         }
1537
1538         /* Remove for the convert reply, and premptively remove for the
1539            cancel reply.  A convert has been granted while there's still
1540            an outstanding cancel on it (the cancel is moot and the result
1541            in the cancel reply should be 0).  We preempt the cancel reply
1542            because the app gets the convert result and then can follow up
1543            with another op, like convert.  This subsequent op would see the
1544            lingering state of the cancel and fail with -EBUSY. */
1545
1546         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1547             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1548             is_overlap_cancel(lkb) && ms && !ms->m_result) {
1549                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1550                           lkb->lkb_id);
1551                 lkb->lkb_wait_type = 0;
1552                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1553                 lkb->lkb_wait_count--;
1554                 unhold_lkb(lkb);
1555                 goto out_del;
1556         }
1557
1558         /* N.B. type of reply may not always correspond to type of original
1559            msg due to lookup->request optimization, verify others? */
1560
1561         if (lkb->lkb_wait_type) {
1562                 lkb->lkb_wait_type = 0;
1563                 goto out_del;
1564         }
1565
1566         log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1567                   lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
1568                   mstype, lkb->lkb_flags);
1569         return -1;
1570
1571  out_del:
1572         /* the force-unlock/cancel has completed and we haven't recvd a reply
1573            to the op that was in progress prior to the unlock/cancel; we
1574            give up on any reply to the earlier op.  FIXME: not sure when/how
1575            this would happen */
1576
1577         if (overlap_done && lkb->lkb_wait_type) {
1578                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1579                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
1580                 lkb->lkb_wait_count--;
1581                 unhold_lkb(lkb);
1582                 lkb->lkb_wait_type = 0;
1583         }
1584
1585         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1586
1587         lkb->lkb_flags &= ~DLM_IFL_RESEND;
1588         lkb->lkb_wait_count--;
1589         if (!lkb->lkb_wait_count)
1590                 list_del_init(&lkb->lkb_wait_reply);
1591         unhold_lkb(lkb);
1592         return 0;
1593 }
1594
1595 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1596 {
1597         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1598         int error;
1599
1600         mutex_lock(&ls->ls_waiters_mutex);
1601         error = _remove_from_waiters(lkb, mstype, NULL);
1602         mutex_unlock(&ls->ls_waiters_mutex);
1603         return error;
1604 }
1605
1606 /* Handles situations where we might be processing a "fake" or "stub" reply in
1607    which we can't try to take waiters_mutex again. */
1608
1609 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1610 {
1611         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1612         int error;
1613
1614         if (ms->m_flags != DLM_IFL_STUB_MS)
1615                 mutex_lock(&ls->ls_waiters_mutex);
1616         error = _remove_from_waiters(lkb, ms->m_type, ms);
1617         if (ms->m_flags != DLM_IFL_STUB_MS)
1618                 mutex_unlock(&ls->ls_waiters_mutex);
1619         return error;
1620 }
1621
1622 /* If there's an rsb for the same resource being removed, ensure
1623    that the remove message is sent before the new lookup message.
1624    It should be rare to need a delay here, but if not, then it may
1625    be worthwhile to add a proper wait mechanism rather than a delay. */
1626
1627 static void wait_pending_remove(struct dlm_rsb *r)
1628 {
1629         struct dlm_ls *ls = r->res_ls;
1630  restart:
1631         spin_lock(&ls->ls_remove_spin);
1632         if (ls->ls_remove_len &&
1633             !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) {
1634                 log_debug(ls, "delay lookup for remove dir %d %s",
1635                           r->res_dir_nodeid, r->res_name);
1636                 spin_unlock(&ls->ls_remove_spin);
1637                 msleep(1);
1638                 goto restart;
1639         }
1640         spin_unlock(&ls->ls_remove_spin);
1641 }
1642
1643 /*
1644  * ls_remove_spin protects ls_remove_name and ls_remove_len which are
1645  * read by other threads in wait_pending_remove.  ls_remove_names
1646  * and ls_remove_lens are only used by the scan thread, so they do
1647  * not need protection.
1648  */
1649
1650 static void shrink_bucket(struct dlm_ls *ls, int b)
1651 {
1652         struct rb_node *n, *next;
1653         struct dlm_rsb *r;
1654         char *name;
1655         int our_nodeid = dlm_our_nodeid();
1656         int remote_count = 0;
1657         int need_shrink = 0;
1658         int i, len, rv;
1659
1660         memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1661
1662         spin_lock(&ls->ls_rsbtbl[b].lock);
1663
1664         if (!(ls->ls_rsbtbl[b].flags & DLM_RTF_SHRINK)) {
1665                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1666                 return;
1667         }
1668
1669         for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1670                 next = rb_next(n);
1671                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1672
1673                 /* If we're the directory record for this rsb, and
1674                    we're not the master of it, then we need to wait
1675                    for the master node to send us a dir remove for
1676                    before removing the dir record. */
1677
1678                 if (!dlm_no_directory(ls) &&
1679                     (r->res_master_nodeid != our_nodeid) &&
1680                     (dlm_dir_nodeid(r) == our_nodeid)) {
1681                         continue;
1682                 }
1683
1684                 need_shrink = 1;
1685
1686                 if (!time_after_eq(jiffies, r->res_toss_time +
1687                                    dlm_config.ci_toss_secs * HZ)) {
1688                         continue;
1689                 }
1690
1691                 if (!dlm_no_directory(ls) &&
1692                     (r->res_master_nodeid == our_nodeid) &&
1693                     (dlm_dir_nodeid(r) != our_nodeid)) {
1694
1695                         /* We're the master of this rsb but we're not
1696                            the directory record, so we need to tell the
1697                            dir node to remove the dir record. */
1698
1699                         ls->ls_remove_lens[remote_count] = r->res_length;
1700                         memcpy(ls->ls_remove_names[remote_count], r->res_name,
1701                                DLM_RESNAME_MAXLEN);
1702                         remote_count++;
1703
1704                         if (remote_count >= DLM_REMOVE_NAMES_MAX)
1705                                 break;
1706                         continue;
1707                 }
1708
1709                 if (!kref_put(&r->res_ref, kill_rsb)) {
1710                         log_error(ls, "tossed rsb in use %s", r->res_name);
1711                         continue;
1712                 }
1713
1714                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1715                 dlm_free_rsb(r);
1716         }
1717
1718         if (need_shrink)
1719                 ls->ls_rsbtbl[b].flags |= DLM_RTF_SHRINK;
1720         else
1721                 ls->ls_rsbtbl[b].flags &= ~DLM_RTF_SHRINK;
1722         spin_unlock(&ls->ls_rsbtbl[b].lock);
1723
1724         /*
1725          * While searching for rsb's to free, we found some that require
1726          * remote removal.  We leave them in place and find them again here
1727          * so there is a very small gap between removing them from the toss
1728          * list and sending the removal.  Keeping this gap small is
1729          * important to keep us (the master node) from being out of sync
1730          * with the remote dir node for very long.
1731          *
1732          * From the time the rsb is removed from toss until just after
1733          * send_remove, the rsb name is saved in ls_remove_name.  A new
1734          * lookup checks this to ensure that a new lookup message for the
1735          * same resource name is not sent just before the remove message.
1736          */
1737
1738         for (i = 0; i < remote_count; i++) {
1739                 name = ls->ls_remove_names[i];
1740                 len = ls->ls_remove_lens[i];
1741
1742                 spin_lock(&ls->ls_rsbtbl[b].lock);
1743                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1744                 if (rv) {
1745                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1746                         log_debug(ls, "remove_name not toss %s", name);
1747                         continue;
1748                 }
1749
1750                 if (r->res_master_nodeid != our_nodeid) {
1751                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1752                         log_debug(ls, "remove_name master %d dir %d our %d %s",
1753                                   r->res_master_nodeid, r->res_dir_nodeid,
1754                                   our_nodeid, name);
1755                         continue;
1756                 }
1757
1758                 if (r->res_dir_nodeid == our_nodeid) {
1759                         /* should never happen */
1760                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1761                         log_error(ls, "remove_name dir %d master %d our %d %s",
1762                                   r->res_dir_nodeid, r->res_master_nodeid,
1763                                   our_nodeid, name);
1764                         continue;
1765                 }
1766
1767                 if (!time_after_eq(jiffies, r->res_toss_time +
1768                                    dlm_config.ci_toss_secs * HZ)) {
1769                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1770                         log_debug(ls, "remove_name toss_time %lu now %lu %s",
1771                                   r->res_toss_time, jiffies, name);
1772                         continue;
1773                 }
1774
1775                 if (!kref_put(&r->res_ref, kill_rsb)) {
1776                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1777                         log_error(ls, "remove_name in use %s", name);
1778                         continue;
1779                 }
1780
1781                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1782
1783                 /* block lookup of same name until we've sent remove */
1784                 spin_lock(&ls->ls_remove_spin);
1785                 ls->ls_remove_len = len;
1786                 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
1787                 spin_unlock(&ls->ls_remove_spin);
1788                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1789
1790                 send_remove(r);
1791
1792                 /* allow lookup of name again */
1793                 spin_lock(&ls->ls_remove_spin);
1794                 ls->ls_remove_len = 0;
1795                 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
1796                 spin_unlock(&ls->ls_remove_spin);
1797
1798                 dlm_free_rsb(r);
1799         }
1800 }
1801
1802 void dlm_scan_rsbs(struct dlm_ls *ls)
1803 {
1804         int i;
1805
1806         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1807                 shrink_bucket(ls, i);
1808                 if (dlm_locking_stopped(ls))
1809                         break;
1810                 cond_resched();
1811         }
1812 }
1813
1814 static void add_timeout(struct dlm_lkb *lkb)
1815 {
1816         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1817
1818         if (is_master_copy(lkb))
1819                 return;
1820
1821         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1822             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1823                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1824                 goto add_it;
1825         }
1826         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1827                 goto add_it;
1828         return;
1829
1830  add_it:
1831         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1832         mutex_lock(&ls->ls_timeout_mutex);
1833         hold_lkb(lkb);
1834         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1835         mutex_unlock(&ls->ls_timeout_mutex);
1836 }
1837
1838 static void del_timeout(struct dlm_lkb *lkb)
1839 {
1840         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1841
1842         mutex_lock(&ls->ls_timeout_mutex);
1843         if (!list_empty(&lkb->lkb_time_list)) {
1844                 list_del_init(&lkb->lkb_time_list);
1845                 unhold_lkb(lkb);
1846         }
1847         mutex_unlock(&ls->ls_timeout_mutex);
1848 }
1849
1850 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1851    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1852    and then lock rsb because of lock ordering in add_timeout.  We may need
1853    to specify some special timeout-related bits in the lkb that are just to
1854    be accessed under the timeout_mutex. */
1855
1856 void dlm_scan_timeout(struct dlm_ls *ls)
1857 {
1858         struct dlm_rsb *r;
1859         struct dlm_lkb *lkb = NULL, *iter;
1860         int do_cancel, do_warn;
1861         s64 wait_us;
1862
1863         for (;;) {
1864                 if (dlm_locking_stopped(ls))
1865                         break;
1866
1867                 do_cancel = 0;
1868                 do_warn = 0;
1869                 mutex_lock(&ls->ls_timeout_mutex);
1870                 list_for_each_entry(iter, &ls->ls_timeout, lkb_time_list) {
1871
1872                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1873                                                         iter->lkb_timestamp));
1874
1875                         if ((iter->lkb_exflags & DLM_LKF_TIMEOUT) &&
1876                             wait_us >= (iter->lkb_timeout_cs * 10000))
1877                                 do_cancel = 1;
1878
1879                         if ((iter->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1880                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1881                                 do_warn = 1;
1882
1883                         if (!do_cancel && !do_warn)
1884                                 continue;
1885                         hold_lkb(iter);
1886                         lkb = iter;
1887                         break;
1888                 }
1889                 mutex_unlock(&ls->ls_timeout_mutex);
1890
1891                 if (!lkb)
1892                         break;
1893
1894                 r = lkb->lkb_resource;
1895                 hold_rsb(r);
1896                 lock_rsb(r);
1897
1898                 if (do_warn) {
1899                         /* clear flag so we only warn once */
1900                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1901                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1902                                 del_timeout(lkb);
1903                         dlm_timeout_warn(lkb);
1904                 }
1905
1906                 if (do_cancel) {
1907                         log_debug(ls, "timeout cancel %x node %d %s",
1908                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1909                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1910                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1911                         del_timeout(lkb);
1912                         _cancel_lock(r, lkb);
1913                 }
1914
1915                 unlock_rsb(r);
1916                 unhold_rsb(r);
1917                 dlm_put_lkb(lkb);
1918         }
1919 }
1920
1921 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1922    dlm_recoverd before checking/setting ls_recover_begin. */
1923
1924 void dlm_adjust_timeouts(struct dlm_ls *ls)
1925 {
1926         struct dlm_lkb *lkb;
1927         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1928
1929         ls->ls_recover_begin = 0;
1930         mutex_lock(&ls->ls_timeout_mutex);
1931         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1932                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1933         mutex_unlock(&ls->ls_timeout_mutex);
1934
1935         if (!dlm_config.ci_waitwarn_us)
1936                 return;
1937
1938         mutex_lock(&ls->ls_waiters_mutex);
1939         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1940                 if (ktime_to_us(lkb->lkb_wait_time))
1941                         lkb->lkb_wait_time = ktime_get();
1942         }
1943         mutex_unlock(&ls->ls_waiters_mutex);
1944 }
1945
1946 /* lkb is master or local copy */
1947
1948 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1949 {
1950         int b, len = r->res_ls->ls_lvblen;
1951
1952         /* b=1 lvb returned to caller
1953            b=0 lvb written to rsb or invalidated
1954            b=-1 do nothing */
1955
1956         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1957
1958         if (b == 1) {
1959                 if (!lkb->lkb_lvbptr)
1960                         return;
1961
1962                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1963                         return;
1964
1965                 if (!r->res_lvbptr)
1966                         return;
1967
1968                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1969                 lkb->lkb_lvbseq = r->res_lvbseq;
1970
1971         } else if (b == 0) {
1972                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1973                         rsb_set_flag(r, RSB_VALNOTVALID);
1974                         return;
1975                 }
1976
1977                 if (!lkb->lkb_lvbptr)
1978                         return;
1979
1980                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1981                         return;
1982
1983                 if (!r->res_lvbptr)
1984                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1985
1986                 if (!r->res_lvbptr)
1987                         return;
1988
1989                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1990                 r->res_lvbseq++;
1991                 lkb->lkb_lvbseq = r->res_lvbseq;
1992                 rsb_clear_flag(r, RSB_VALNOTVALID);
1993         }
1994
1995         if (rsb_flag(r, RSB_VALNOTVALID))
1996                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1997 }
1998
1999 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2000 {
2001         if (lkb->lkb_grmode < DLM_LOCK_PW)
2002                 return;
2003
2004         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
2005                 rsb_set_flag(r, RSB_VALNOTVALID);
2006                 return;
2007         }
2008
2009         if (!lkb->lkb_lvbptr)
2010                 return;
2011
2012         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2013                 return;
2014
2015         if (!r->res_lvbptr)
2016                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
2017
2018         if (!r->res_lvbptr)
2019                 return;
2020
2021         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2022         r->res_lvbseq++;
2023         rsb_clear_flag(r, RSB_VALNOTVALID);
2024 }
2025
2026 /* lkb is process copy (pc) */
2027
2028 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2029                             struct dlm_message *ms)
2030 {
2031         int b;
2032
2033         if (!lkb->lkb_lvbptr)
2034                 return;
2035
2036         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2037                 return;
2038
2039         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
2040         if (b == 1) {
2041                 int len = receive_extralen(ms);
2042                 if (len > r->res_ls->ls_lvblen)
2043                         len = r->res_ls->ls_lvblen;
2044                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2045                 lkb->lkb_lvbseq = ms->m_lvbseq;
2046         }
2047 }
2048
2049 /* Manipulate lkb's on rsb's convert/granted/waiting queues
2050    remove_lock -- used for unlock, removes lkb from granted
2051    revert_lock -- used for cancel, moves lkb from convert to granted
2052    grant_lock  -- used for request and convert, adds lkb to granted or
2053                   moves lkb from convert or waiting to granted
2054
2055    Each of these is used for master or local copy lkb's.  There is
2056    also a _pc() variation used to make the corresponding change on
2057    a process copy (pc) lkb. */
2058
2059 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2060 {
2061         del_lkb(r, lkb);
2062         lkb->lkb_grmode = DLM_LOCK_IV;
2063         /* this unhold undoes the original ref from create_lkb()
2064            so this leads to the lkb being freed */
2065         unhold_lkb(lkb);
2066 }
2067
2068 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2069 {
2070         set_lvb_unlock(r, lkb);
2071         _remove_lock(r, lkb);
2072 }
2073
2074 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2075 {
2076         _remove_lock(r, lkb);
2077 }
2078
2079 /* returns: 0 did nothing
2080             1 moved lock to granted
2081            -1 removed lock */
2082
2083 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2084 {
2085         int rv = 0;
2086
2087         lkb->lkb_rqmode = DLM_LOCK_IV;
2088
2089         switch (lkb->lkb_status) {
2090         case DLM_LKSTS_GRANTED:
2091                 break;
2092         case DLM_LKSTS_CONVERT:
2093                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2094                 rv = 1;
2095                 break;
2096         case DLM_LKSTS_WAITING:
2097                 del_lkb(r, lkb);
2098                 lkb->lkb_grmode = DLM_LOCK_IV;
2099                 /* this unhold undoes the original ref from create_lkb()
2100                    so this leads to the lkb being freed */
2101                 unhold_lkb(lkb);
2102                 rv = -1;
2103                 break;
2104         default:
2105                 log_print("invalid status for revert %d", lkb->lkb_status);
2106         }
2107         return rv;
2108 }
2109
2110 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2111 {
2112         return revert_lock(r, lkb);
2113 }
2114
2115 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2116 {
2117         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2118                 lkb->lkb_grmode = lkb->lkb_rqmode;
2119                 if (lkb->lkb_status)
2120                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2121                 else
2122                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2123         }
2124
2125         lkb->lkb_rqmode = DLM_LOCK_IV;
2126         lkb->lkb_highbast = 0;
2127 }
2128
2129 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2130 {
2131         set_lvb_lock(r, lkb);
2132         _grant_lock(r, lkb);
2133 }
2134
2135 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2136                           struct dlm_message *ms)
2137 {
2138         set_lvb_lock_pc(r, lkb, ms);
2139         _grant_lock(r, lkb);
2140 }
2141
2142 /* called by grant_pending_locks() which means an async grant message must
2143    be sent to the requesting node in addition to granting the lock if the
2144    lkb belongs to a remote node. */
2145
2146 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2147 {
2148         grant_lock(r, lkb);
2149         if (is_master_copy(lkb))
2150                 send_grant(r, lkb);
2151         else
2152                 queue_cast(r, lkb, 0);
2153 }
2154
2155 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2156    change the granted/requested modes.  We're munging things accordingly in
2157    the process copy.
2158    CONVDEADLK: our grmode may have been forced down to NL to resolve a
2159    conversion deadlock
2160    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2161    compatible with other granted locks */
2162
2163 static void munge_demoted(struct dlm_lkb *lkb)
2164 {
2165         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2166                 log_print("munge_demoted %x invalid modes gr %d rq %d",
2167                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2168                 return;
2169         }
2170
2171         lkb->lkb_grmode = DLM_LOCK_NL;
2172 }
2173
2174 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
2175 {
2176         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
2177             ms->m_type != DLM_MSG_GRANT) {
2178                 log_print("munge_altmode %x invalid reply type %d",
2179                           lkb->lkb_id, ms->m_type);
2180                 return;
2181         }
2182
2183         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2184                 lkb->lkb_rqmode = DLM_LOCK_PR;
2185         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2186                 lkb->lkb_rqmode = DLM_LOCK_CW;
2187         else {
2188                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2189                 dlm_print_lkb(lkb);
2190         }
2191 }
2192
2193 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2194 {
2195         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2196                                            lkb_statequeue);
2197         if (lkb->lkb_id == first->lkb_id)
2198                 return 1;
2199
2200         return 0;
2201 }
2202
2203 /* Check if the given lkb conflicts with another lkb on the queue. */
2204
2205 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2206 {
2207         struct dlm_lkb *this;
2208
2209         list_for_each_entry(this, head, lkb_statequeue) {
2210                 if (this == lkb)
2211                         continue;
2212                 if (!modes_compat(this, lkb))
2213                         return 1;
2214         }
2215         return 0;
2216 }
2217
2218 /*
2219  * "A conversion deadlock arises with a pair of lock requests in the converting
2220  * queue for one resource.  The granted mode of each lock blocks the requested
2221  * mode of the other lock."
2222  *
2223  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2224  * convert queue from being granted, then deadlk/demote lkb.
2225  *
2226  * Example:
2227  * Granted Queue: empty
2228  * Convert Queue: NL->EX (first lock)
2229  *                PR->EX (second lock)
2230  *
2231  * The first lock can't be granted because of the granted mode of the second
2232  * lock and the second lock can't be granted because it's not first in the
2233  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2234  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2235  * flag set and return DEMOTED in the lksb flags.
2236  *
2237  * Originally, this function detected conv-deadlk in a more limited scope:
2238  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2239  * - if lkb1 was the first entry in the queue (not just earlier), and was
2240  *   blocked by the granted mode of lkb2, and there was nothing on the
2241  *   granted queue preventing lkb1 from being granted immediately, i.e.
2242  *   lkb2 was the only thing preventing lkb1 from being granted.
2243  *
2244  * That second condition meant we'd only say there was conv-deadlk if
2245  * resolving it (by demotion) would lead to the first lock on the convert
2246  * queue being granted right away.  It allowed conversion deadlocks to exist
2247  * between locks on the convert queue while they couldn't be granted anyway.
2248  *
2249  * Now, we detect and take action on conversion deadlocks immediately when
2250  * they're created, even if they may not be immediately consequential.  If
2251  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2252  * mode that would prevent lkb1's conversion from being granted, we do a
2253  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2254  * I think this means that the lkb_is_ahead condition below should always
2255  * be zero, i.e. there will never be conv-deadlk between two locks that are
2256  * both already on the convert queue.
2257  */
2258
2259 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2260 {
2261         struct dlm_lkb *lkb1;
2262         int lkb_is_ahead = 0;
2263
2264         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2265                 if (lkb1 == lkb2) {
2266                         lkb_is_ahead = 1;
2267                         continue;
2268                 }
2269
2270                 if (!lkb_is_ahead) {
2271                         if (!modes_compat(lkb2, lkb1))
2272                                 return 1;
2273                 } else {
2274                         if (!modes_compat(lkb2, lkb1) &&
2275                             !modes_compat(lkb1, lkb2))
2276                                 return 1;
2277                 }
2278         }
2279         return 0;
2280 }
2281
2282 /*
2283  * Return 1 if the lock can be granted, 0 otherwise.
2284  * Also detect and resolve conversion deadlocks.
2285  *
2286  * lkb is the lock to be granted
2287  *
2288  * now is 1 if the function is being called in the context of the
2289  * immediate request, it is 0 if called later, after the lock has been
2290  * queued.
2291  *
2292  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2293  * after recovery.
2294  *
2295  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2296  */
2297
2298 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2299                            int recover)
2300 {
2301         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2302
2303         /*
2304          * 6-10: Version 5.4 introduced an option to address the phenomenon of
2305          * a new request for a NL mode lock being blocked.
2306          *
2307          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2308          * request, then it would be granted.  In essence, the use of this flag
2309          * tells the Lock Manager to expedite theis request by not considering
2310          * what may be in the CONVERTING or WAITING queues...  As of this
2311          * writing, the EXPEDITE flag can be used only with new requests for NL
2312          * mode locks.  This flag is not valid for conversion requests.
2313          *
2314          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2315          * conversion or used with a non-NL requested mode.  We also know an
2316          * EXPEDITE request is always granted immediately, so now must always
2317          * be 1.  The full condition to grant an expedite request: (now &&
2318          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2319          * therefore be shortened to just checking the flag.
2320          */
2321
2322         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2323                 return 1;
2324
2325         /*
2326          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2327          * added to the remaining conditions.
2328          */
2329
2330         if (queue_conflict(&r->res_grantqueue, lkb))
2331                 return 0;
2332
2333         /*
2334          * 6-3: By default, a conversion request is immediately granted if the
2335          * requested mode is compatible with the modes of all other granted
2336          * locks
2337          */
2338
2339         if (queue_conflict(&r->res_convertqueue, lkb))
2340                 return 0;
2341
2342         /*
2343          * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2344          * locks for a recovered rsb, on which lkb's have been rebuilt.
2345          * The lkb's may have been rebuilt on the queues in a different
2346          * order than they were in on the previous master.  So, granting
2347          * queued conversions in order after recovery doesn't make sense
2348          * since the order hasn't been preserved anyway.  The new order
2349          * could also have created a new "in place" conversion deadlock.
2350          * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2351          * After recovery, there would be no granted locks, and possibly
2352          * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2353          * recovery, grant conversions without considering order.
2354          */
2355
2356         if (conv && recover)
2357                 return 1;
2358
2359         /*
2360          * 6-5: But the default algorithm for deciding whether to grant or
2361          * queue conversion requests does not by itself guarantee that such
2362          * requests are serviced on a "first come first serve" basis.  This, in
2363          * turn, can lead to a phenomenon known as "indefinate postponement".
2364          *
2365          * 6-7: This issue is dealt with by using the optional QUECVT flag with
2366          * the system service employed to request a lock conversion.  This flag
2367          * forces certain conversion requests to be queued, even if they are
2368          * compatible with the granted modes of other locks on the same
2369          * resource.  Thus, the use of this flag results in conversion requests
2370          * being ordered on a "first come first servce" basis.
2371          *
2372          * DCT: This condition is all about new conversions being able to occur
2373          * "in place" while the lock remains on the granted queue (assuming
2374          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2375          * doesn't _have_ to go onto the convert queue where it's processed in
2376          * order.  The "now" variable is necessary to distinguish converts
2377          * being received and processed for the first time now, because once a
2378          * convert is moved to the conversion queue the condition below applies
2379          * requiring fifo granting.
2380          */
2381
2382         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2383                 return 1;
2384
2385         /*
2386          * Even if the convert is compat with all granted locks,
2387          * QUECVT forces it behind other locks on the convert queue.
2388          */
2389
2390         if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2391                 if (list_empty(&r->res_convertqueue))
2392                         return 1;
2393                 else
2394                         return 0;
2395         }
2396
2397         /*
2398          * The NOORDER flag is set to avoid the standard vms rules on grant
2399          * order.
2400          */
2401
2402         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2403                 return 1;
2404
2405         /*
2406          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2407          * granted until all other conversion requests ahead of it are granted
2408          * and/or canceled.
2409          */
2410
2411         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2412                 return 1;
2413
2414         /*
2415          * 6-4: By default, a new request is immediately granted only if all
2416          * three of the following conditions are satisfied when the request is
2417          * issued:
2418          * - The queue of ungranted conversion requests for the resource is
2419          *   empty.
2420          * - The queue of ungranted new requests for the resource is empty.
2421          * - The mode of the new request is compatible with the most
2422          *   restrictive mode of all granted locks on the resource.
2423          */
2424
2425         if (now && !conv && list_empty(&r->res_convertqueue) &&
2426             list_empty(&r->res_waitqueue))
2427                 return 1;
2428
2429         /*
2430          * 6-4: Once a lock request is in the queue of ungranted new requests,
2431          * it cannot be granted until the queue of ungranted conversion
2432          * requests is empty, all ungranted new requests ahead of it are
2433          * granted and/or canceled, and it is compatible with the granted mode
2434          * of the most restrictive lock granted on the resource.
2435          */
2436
2437         if (!now && !conv && list_empty(&r->res_convertqueue) &&
2438             first_in_list(lkb, &r->res_waitqueue))
2439                 return 1;
2440
2441         return 0;
2442 }
2443
2444 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2445                           int recover, int *err)
2446 {
2447         int rv;
2448         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2449         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2450
2451         if (err)
2452                 *err = 0;
2453
2454         rv = _can_be_granted(r, lkb, now, recover);
2455         if (rv)
2456                 goto out;
2457
2458         /*
2459          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2460          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2461          * cancels one of the locks.
2462          */
2463
2464         if (is_convert && can_be_queued(lkb) &&
2465             conversion_deadlock_detect(r, lkb)) {
2466                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2467                         lkb->lkb_grmode = DLM_LOCK_NL;
2468                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
2469                 } else if (err) {
2470                         *err = -EDEADLK;
2471                 } else {
2472                         log_print("can_be_granted deadlock %x now %d",
2473                                   lkb->lkb_id, now);
2474                         dlm_dump_rsb(r);
2475                 }
2476                 goto out;
2477         }
2478
2479         /*
2480          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2481          * to grant a request in a mode other than the normal rqmode.  It's a
2482          * simple way to provide a big optimization to applications that can
2483          * use them.
2484          */
2485
2486         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2487                 alt = DLM_LOCK_PR;
2488         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2489                 alt = DLM_LOCK_CW;
2490
2491         if (alt) {
2492                 lkb->lkb_rqmode = alt;
2493                 rv = _can_be_granted(r, lkb, now, 0);
2494                 if (rv)
2495                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
2496                 else
2497                         lkb->lkb_rqmode = rqmode;
2498         }
2499  out:
2500         return rv;
2501 }
2502
2503 /* Returns the highest requested mode of all blocked conversions; sets
2504    cw if there's a blocked conversion to DLM_LOCK_CW. */
2505
2506 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2507                                  unsigned int *count)
2508 {
2509         struct dlm_lkb *lkb, *s;
2510         int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2511         int hi, demoted, quit, grant_restart, demote_restart;
2512         int deadlk;
2513
2514         quit = 0;
2515  restart:
2516         grant_restart = 0;
2517         demote_restart = 0;
2518         hi = DLM_LOCK_IV;
2519
2520         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2521                 demoted = is_demoted(lkb);
2522                 deadlk = 0;
2523
2524                 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2525                         grant_lock_pending(r, lkb);
2526                         grant_restart = 1;
2527                         if (count)
2528                                 (*count)++;
2529                         continue;
2530                 }
2531
2532                 if (!demoted && is_demoted(lkb)) {
2533                         log_print("WARN: pending demoted %x node %d %s",
2534                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2535                         demote_restart = 1;
2536                         continue;
2537                 }
2538
2539                 if (deadlk) {
2540                         /*
2541                          * If DLM_LKB_NODLKWT flag is set and conversion
2542                          * deadlock is detected, we request blocking AST and
2543                          * down (or cancel) conversion.
2544                          */
2545                         if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2546                                 if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2547                                         queue_bast(r, lkb, lkb->lkb_rqmode);
2548                                         lkb->lkb_highbast = lkb->lkb_rqmode;
2549                                 }
2550                         } else {
2551                                 log_print("WARN: pending deadlock %x node %d %s",
2552                                           lkb->lkb_id, lkb->lkb_nodeid,
2553                                           r->res_name);
2554                                 dlm_dump_rsb(r);
2555                         }
2556                         continue;
2557                 }
2558
2559                 hi = max_t(int, lkb->lkb_rqmode, hi);
2560
2561                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2562                         *cw = 1;
2563         }
2564
2565         if (grant_restart)
2566                 goto restart;
2567         if (demote_restart && !quit) {
2568                 quit = 1;
2569                 goto restart;
2570         }
2571
2572         return max_t(int, high, hi);
2573 }
2574
2575 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2576                               unsigned int *count)
2577 {
2578         struct dlm_lkb *lkb, *s;
2579
2580         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2581                 if (can_be_granted(r, lkb, 0, 0, NULL)) {
2582                         grant_lock_pending(r, lkb);
2583                         if (count)
2584                                 (*count)++;
2585                 } else {
2586                         high = max_t(int, lkb->lkb_rqmode, high);
2587                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
2588                                 *cw = 1;
2589                 }
2590         }
2591
2592         return high;
2593 }
2594
2595 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2596    on either the convert or waiting queue.
2597    high is the largest rqmode of all locks blocked on the convert or
2598    waiting queue. */
2599
2600 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2601 {
2602         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2603                 if (gr->lkb_highbast < DLM_LOCK_EX)
2604                         return 1;
2605                 return 0;
2606         }
2607
2608         if (gr->lkb_highbast < high &&
2609             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2610                 return 1;
2611         return 0;
2612 }
2613
2614 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2615 {
2616         struct dlm_lkb *lkb, *s;
2617         int high = DLM_LOCK_IV;
2618         int cw = 0;
2619
2620         if (!is_master(r)) {
2621                 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2622                 dlm_dump_rsb(r);
2623                 return;
2624         }
2625
2626         high = grant_pending_convert(r, high, &cw, count);
2627         high = grant_pending_wait(r, high, &cw, count);
2628
2629         if (high == DLM_LOCK_IV)
2630                 return;
2631
2632         /*
2633          * If there are locks left on the wait/convert queue then send blocking
2634          * ASTs to granted locks based on the largest requested mode (high)
2635          * found above.
2636          */
2637
2638         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2639                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2640                         if (cw && high == DLM_LOCK_PR &&
2641                             lkb->lkb_grmode == DLM_LOCK_PR)
2642                                 queue_bast(r, lkb, DLM_LOCK_CW);
2643                         else
2644                                 queue_bast(r, lkb, high);
2645                         lkb->lkb_highbast = high;
2646                 }
2647         }
2648 }
2649
2650 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2651 {
2652         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2653             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2654                 if (gr->lkb_highbast < DLM_LOCK_EX)
2655                         return 1;
2656                 return 0;
2657         }
2658
2659         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2660                 return 1;
2661         return 0;
2662 }
2663
2664 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2665                             struct dlm_lkb *lkb)
2666 {
2667         struct dlm_lkb *gr;
2668
2669         list_for_each_entry(gr, head, lkb_statequeue) {
2670                 /* skip self when sending basts to convertqueue */
2671                 if (gr == lkb)
2672                         continue;
2673                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2674                         queue_bast(r, gr, lkb->lkb_rqmode);
2675                         gr->lkb_highbast = lkb->lkb_rqmode;
2676                 }
2677         }
2678 }
2679
2680 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2681 {
2682         send_bast_queue(r, &r->res_grantqueue, lkb);
2683 }
2684
2685 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2686 {
2687         send_bast_queue(r, &r->res_grantqueue, lkb);
2688         send_bast_queue(r, &r->res_convertqueue, lkb);
2689 }
2690
2691 /* set_master(r, lkb) -- set the master nodeid of a resource
2692
2693    The purpose of this function is to set the nodeid field in the given
2694    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2695    known, it can just be copied to the lkb and the function will return
2696    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2697    before it can be copied to the lkb.
2698
2699    When the rsb nodeid is being looked up remotely, the initial lkb
2700    causing the lookup is kept on the ls_waiters list waiting for the
2701    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2702    on the rsb's res_lookup list until the master is verified.
2703
2704    Return values:
2705    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2706    1: the rsb master is not available and the lkb has been placed on
2707       a wait queue
2708 */
2709
2710 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2711 {
2712         int our_nodeid = dlm_our_nodeid();
2713
2714         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2715                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2716                 r->res_first_lkid = lkb->lkb_id;
2717                 lkb->lkb_nodeid = r->res_nodeid;
2718                 return 0;
2719         }
2720
2721         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2722                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2723                 return 1;
2724         }
2725
2726         if (r->res_master_nodeid == our_nodeid) {
2727                 lkb->lkb_nodeid = 0;
2728                 return 0;
2729         }
2730
2731         if (r->res_master_nodeid) {
2732                 lkb->lkb_nodeid = r->res_master_nodeid;
2733                 return 0;
2734         }
2735
2736         if (dlm_dir_nodeid(r) == our_nodeid) {
2737                 /* This is a somewhat unusual case; find_rsb will usually
2738                    have set res_master_nodeid when dir nodeid is local, but
2739                    there are cases where we become the dir node after we've
2740                    past find_rsb and go through _request_lock again.
2741                    confirm_master() or process_lookup_list() needs to be
2742                    called after this. */
2743                 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2744                           lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2745                           r->res_name);
2746                 r->res_master_nodeid = our_nodeid;
2747                 r->res_nodeid = 0;
2748                 lkb->lkb_nodeid = 0;
2749                 return 0;
2750         }
2751
2752         wait_pending_remove(r);
2753
2754         r->res_first_lkid = lkb->lkb_id;
2755         send_lookup(r, lkb);
2756         return 1;
2757 }
2758
2759 static void process_lookup_list(struct dlm_rsb *r)
2760 {
2761         struct dlm_lkb *lkb, *safe;
2762
2763         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2764                 list_del_init(&lkb->lkb_rsb_lookup);
2765                 _request_lock(r, lkb);
2766                 schedule();
2767         }
2768 }
2769
2770 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2771
2772 static void confirm_master(struct dlm_rsb *r, int error)
2773 {
2774         struct dlm_lkb *lkb;
2775
2776         if (!r->res_first_lkid)
2777                 return;
2778
2779         switch (error) {
2780         case 0:
2781         case -EINPROGRESS:
2782                 r->res_first_lkid = 0;
2783                 process_lookup_list(r);
2784                 break;
2785
2786         case -EAGAIN:
2787         case -EBADR:
2788         case -ENOTBLK:
2789                 /* the remote request failed and won't be retried (it was
2790                    a NOQUEUE, or has been canceled/unlocked); make a waiting
2791                    lkb the first_lkid */
2792
2793                 r->res_first_lkid = 0;
2794
2795                 if (!list_empty(&r->res_lookup)) {
2796                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2797                                          lkb_rsb_lookup);
2798                         list_del_init(&lkb->lkb_rsb_lookup);
2799                         r->res_first_lkid = lkb->lkb_id;
2800                         _request_lock(r, lkb);
2801                 }
2802                 break;
2803
2804         default:
2805                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2806         }
2807 }
2808
2809 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2810                          int namelen, unsigned long timeout_cs,
2811                          void (*ast) (void *astparam),
2812                          void *astparam,
2813                          void (*bast) (void *astparam, int mode),
2814                          struct dlm_args *args)
2815 {
2816         int rv = -EINVAL;
2817
2818         /* check for invalid arg usage */
2819
2820         if (mode < 0 || mode > DLM_LOCK_EX)
2821                 goto out;
2822
2823         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2824                 goto out;
2825
2826         if (flags & DLM_LKF_CANCEL)
2827                 goto out;
2828
2829         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2830                 goto out;
2831
2832         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2833                 goto out;
2834
2835         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2836                 goto out;
2837
2838         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2839                 goto out;
2840
2841         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2842                 goto out;
2843
2844         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2845                 goto out;
2846
2847         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2848                 goto out;
2849
2850         if (!ast || !lksb)
2851                 goto out;
2852
2853         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2854                 goto out;
2855
2856         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2857                 goto out;
2858
2859         /* these args will be copied to the lkb in validate_lock_args,
2860            it cannot be done now because when converting locks, fields in
2861            an active lkb cannot be modified before locking the rsb */
2862
2863         args->flags = flags;
2864         args->astfn = ast;
2865         args->astparam = astparam;
2866         args->bastfn = bast;
2867         args->timeout = timeout_cs;
2868         args->mode = mode;
2869         args->lksb = lksb;
2870         rv = 0;
2871  out:
2872         return rv;
2873 }
2874
2875 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2876 {
2877         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2878                       DLM_LKF_FORCEUNLOCK))
2879                 return -EINVAL;
2880
2881         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2882                 return -EINVAL;
2883
2884         args->flags = flags;
2885         args->astparam = astarg;
2886         return 0;
2887 }
2888
2889 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2890                               struct dlm_args *args)
2891 {
2892         int rv = -EBUSY;
2893
2894         if (args->flags & DLM_LKF_CONVERT) {
2895                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2896                         goto out;
2897
2898                 if (lkb->lkb_wait_type)
2899                         goto out;
2900
2901                 if (is_overlap(lkb))
2902                         goto out;
2903
2904                 rv = -EINVAL;
2905                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2906                         goto out;
2907
2908                 if (args->flags & DLM_LKF_QUECVT &&
2909                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2910                         goto out;
2911         }
2912
2913         lkb->lkb_exflags = args->flags;
2914         lkb->lkb_sbflags = 0;
2915         lkb->lkb_astfn = args->astfn;
2916         lkb->lkb_astparam = args->astparam;
2917         lkb->lkb_bastfn = args->bastfn;
2918         lkb->lkb_rqmode = args->mode;
2919         lkb->lkb_lksb = args->lksb;
2920         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2921         lkb->lkb_ownpid = (int) current->pid;
2922         lkb->lkb_timeout_cs = args->timeout;
2923         rv = 0;
2924  out:
2925         if (rv)
2926                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2927                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2928                           lkb->lkb_status, lkb->lkb_wait_type,
2929                           lkb->lkb_resource->res_name);
2930         return rv;
2931 }
2932
2933 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2934    for success */
2935
2936 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2937    because there may be a lookup in progress and it's valid to do
2938    cancel/unlockf on it */
2939
2940 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2941 {
2942         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2943         int rv = -EINVAL;
2944
2945         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2946                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2947                 dlm_print_lkb(lkb);
2948                 goto out;
2949         }
2950
2951         /* an lkb may still exist even though the lock is EOL'ed due to a
2952            cancel, unlock or failed noqueue request; an app can't use these
2953            locks; return same error as if the lkid had not been found at all */
2954
2955         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2956                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2957                 rv = -ENOENT;
2958                 goto out;
2959         }
2960
2961         /* an lkb may be waiting for an rsb lookup to complete where the
2962            lookup was initiated by another lock */
2963
2964         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2965                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2966                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2967                         list_del_init(&lkb->lkb_rsb_lookup);
2968                         queue_cast(lkb->lkb_resource, lkb,
2969                                    args->flags & DLM_LKF_CANCEL ?
2970                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2971                         unhold_lkb(lkb); /* undoes create_lkb() */
2972                 }
2973                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2974                 rv = -EBUSY;
2975                 goto out;
2976         }
2977
2978         /* cancel not allowed with another cancel/unlock in progress */
2979
2980         if (args->flags & DLM_LKF_CANCEL) {
2981                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2982                         goto out;
2983
2984                 if (is_overlap(lkb))
2985                         goto out;
2986
2987                 /* don't let scand try to do a cancel */
2988                 del_timeout(lkb);
2989
2990                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2991                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2992                         rv = -EBUSY;
2993                         goto out;
2994                 }
2995
2996                 /* there's nothing to cancel */
2997                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2998                     !lkb->lkb_wait_type) {
2999                         rv = -EBUSY;
3000                         goto out;
3001                 }
3002
3003                 switch (lkb->lkb_wait_type) {
3004                 case DLM_MSG_LOOKUP:
3005                 case DLM_MSG_REQUEST:
3006                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
3007                         rv = -EBUSY;
3008                         goto out;
3009                 case DLM_MSG_UNLOCK:
3010                 case DLM_MSG_CANCEL:
3011                         goto out;
3012                 }
3013                 /* add_to_waiters() will set OVERLAP_CANCEL */
3014                 goto out_ok;
3015         }
3016
3017         /* do we need to allow a force-unlock if there's a normal unlock
3018            already in progress?  in what conditions could the normal unlock
3019            fail such that we'd want to send a force-unlock to be sure? */
3020
3021         if (args->flags & DLM_LKF_FORCEUNLOCK) {
3022                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
3023                         goto out;
3024
3025                 if (is_overlap_unlock(lkb))
3026                         goto out;
3027
3028                 /* don't let scand try to do a cancel */
3029                 del_timeout(lkb);
3030
3031                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3032                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3033                         rv = -EBUSY;
3034                         goto out;
3035                 }
3036
3037                 switch (lkb->lkb_wait_type) {
3038                 case DLM_MSG_LOOKUP:
3039                 case DLM_MSG_REQUEST:
3040                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3041                         rv = -EBUSY;
3042                         goto out;
3043                 case DLM_MSG_UNLOCK:
3044                         goto out;
3045                 }
3046                 /* add_to_waiters() will set OVERLAP_UNLOCK */
3047                 goto out_ok;
3048         }
3049
3050         /* normal unlock not allowed if there's any op in progress */
3051         rv = -EBUSY;
3052         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
3053                 goto out;
3054
3055  out_ok:
3056         /* an overlapping op shouldn't blow away exflags from other op */
3057         lkb->lkb_exflags |= args->flags;
3058         lkb->lkb_sbflags = 0;
3059         lkb->lkb_astparam = args->astparam;
3060         rv = 0;
3061  out:
3062         if (rv)
3063                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
3064                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
3065                           args->flags, lkb->lkb_wait_type,
3066                           lkb->lkb_resource->res_name);
3067         return rv;
3068 }
3069
3070 /*
3071  * Four stage 4 varieties:
3072  * do_request(), do_convert(), do_unlock(), do_cancel()
3073  * These are called on the master node for the given lock and
3074  * from the central locking logic.
3075  */
3076
3077 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3078 {
3079         int error = 0;
3080
3081         if (can_be_granted(r, lkb, 1, 0, NULL)) {
3082                 grant_lock(r, lkb);
3083                 queue_cast(r, lkb, 0);
3084                 goto out;
3085         }
3086
3087         if (can_be_queued(lkb)) {
3088                 error = -EINPROGRESS;
3089                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3090                 add_timeout(lkb);
3091                 goto out;
3092         }
3093
3094         error = -EAGAIN;
3095         queue_cast(r, lkb, -EAGAIN);
3096  out:
3097         return error;
3098 }
3099
3100 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3101                                int error)
3102 {
3103         switch (error) {
3104         case -EAGAIN:
3105                 if (force_blocking_asts(lkb))
3106                         send_blocking_asts_all(r, lkb);
3107                 break;
3108         case -EINPROGRESS:
3109                 send_blocking_asts(r, lkb);
3110                 break;
3111         }
3112 }
3113
3114 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3115 {
3116         int error = 0;
3117         int deadlk = 0;
3118
3119         /* changing an existing lock may allow others to be granted */
3120
3121         if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3122                 grant_lock(r, lkb);
3123                 queue_cast(r, lkb, 0);
3124                 goto out;
3125         }
3126
3127         /* can_be_granted() detected that this lock would block in a conversion
3128            deadlock, so we leave it on the granted queue and return EDEADLK in
3129            the ast for the convert. */
3130
3131         if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3132                 /* it's left on the granted queue */
3133                 revert_lock(r, lkb);
3134                 queue_cast(r, lkb, -EDEADLK);
3135                 error = -EDEADLK;
3136                 goto out;
3137         }
3138
3139         /* is_demoted() means the can_be_granted() above set the grmode
3140            to NL, and left us on the granted queue.  This auto-demotion
3141            (due to CONVDEADLK) might mean other locks, and/or this lock, are
3142            now grantable.  We have to try to grant other converting locks
3143            before we try again to grant this one. */
3144
3145         if (is_demoted(lkb)) {
3146                 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3147                 if (_can_be_granted(r, lkb, 1, 0)) {
3148                         grant_lock(r, lkb);
3149                         queue_cast(r, lkb, 0);
3150                         goto out;
3151                 }
3152                 /* else fall through and move to convert queue */
3153         }
3154
3155         if (can_be_queued(lkb)) {
3156                 error = -EINPROGRESS;
3157                 del_lkb(r, lkb);
3158                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3159                 add_timeout(lkb);
3160                 goto out;
3161         }
3162
3163         error = -EAGAIN;
3164         queue_cast(r, lkb, -EAGAIN);
3165  out:
3166         return error;
3167 }
3168
3169 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3170                                int error)
3171 {
3172         switch (error) {
3173         case 0:
3174                 grant_pending_locks(r, NULL);
3175                 /* grant_pending_locks also sends basts */
3176                 break;
3177         case -EAGAIN:
3178                 if (force_blocking_asts(lkb))
3179                         send_blocking_asts_all(r, lkb);
3180                 break;
3181         case -EINPROGRESS:
3182                 send_blocking_asts(r, lkb);
3183                 break;
3184         }
3185 }
3186
3187 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3188 {
3189         remove_lock(r, lkb);
3190         queue_cast(r, lkb, -DLM_EUNLOCK);
3191         return -DLM_EUNLOCK;
3192 }
3193
3194 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3195                               int error)
3196 {
3197         grant_pending_locks(r, NULL);
3198 }
3199
3200 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3201
3202 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3203 {
3204         int error;
3205
3206         error = revert_lock(r, lkb);
3207         if (error) {
3208                 queue_cast(r, lkb, -DLM_ECANCEL);
3209                 return -DLM_ECANCEL;
3210         }
3211         return 0;
3212 }
3213
3214 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3215                               int error)
3216 {
3217         if (error)
3218                 grant_pending_locks(r, NULL);
3219 }
3220
3221 /*
3222  * Four stage 3 varieties:
3223  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3224  */
3225
3226 /* add a new lkb to a possibly new rsb, called by requesting process */
3227
3228 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3229 {
3230         int error;
3231
3232         /* set_master: sets lkb nodeid from r */
3233
3234         error = set_master(r, lkb);
3235         if (error < 0)
3236                 goto out;
3237         if (error) {
3238                 error = 0;
3239                 goto out;
3240         }
3241
3242         if (is_remote(r)) {
3243                 /* receive_request() calls do_request() on remote node */
3244                 error = send_request(r, lkb);
3245         } else {
3246                 error = do_request(r, lkb);
3247                 /* for remote locks the request_reply is sent
3248                    between do_request and do_request_effects */
3249                 do_request_effects(r, lkb, error);
3250         }
3251  out:
3252         return error;
3253 }
3254
3255 /* change some property of an existing lkb, e.g. mode */
3256
3257 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3258 {
3259         int error;
3260
3261         if (is_remote(r)) {
3262                 /* receive_convert() calls do_convert() on remote node */
3263                 error = send_convert(r, lkb);
3264         } else {
3265                 error = do_convert(r, lkb);
3266                 /* for remote locks the convert_reply is sent
3267                    between do_convert and do_convert_effects */
3268                 do_convert_effects(r, lkb, error);
3269         }
3270
3271         return error;
3272 }
3273
3274 /* remove an existing lkb from the granted queue */
3275
3276 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3277 {
3278         int error;
3279
3280         if (is_remote(r)) {
3281                 /* receive_unlock() calls do_unlock() on remote node */
3282                 error = send_unlock(r, lkb);
3283         } else {
3284                 error = do_unlock(r, lkb);
3285                 /* for remote locks the unlock_reply is sent
3286                    between do_unlock and do_unlock_effects */
3287                 do_unlock_effects(r, lkb, error);
3288         }
3289
3290         return error;
3291 }
3292
3293 /* remove an existing lkb from the convert or wait queue */
3294
3295 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3296 {
3297         int error;
3298
3299         if (is_remote(r)) {
3300                 /* receive_cancel() calls do_cancel() on remote node */
3301                 error = send_cancel(r, lkb);
3302         } else {
3303                 error = do_cancel(r, lkb);
3304                 /* for remote locks the cancel_reply is sent
3305                    between do_cancel and do_cancel_effects */
3306                 do_cancel_effects(r, lkb, error);
3307         }
3308
3309         return error;
3310 }
3311
3312 /*
3313  * Four stage 2 varieties:
3314  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3315  */
3316
3317 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
3318                         int len, struct dlm_args *args)
3319 {
3320         struct dlm_rsb *r;
3321         int error;
3322
3323         error = validate_lock_args(ls, lkb, args);
3324         if (error)
3325                 return error;
3326
3327         error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3328         if (error)
3329                 return error;
3330
3331         lock_rsb(r);
3332
3333         attach_lkb(r, lkb);
3334         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3335
3336         error = _request_lock(r, lkb);
3337
3338         unlock_rsb(r);
3339         put_rsb(r);
3340         return error;
3341 }
3342
3343 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3344                         struct dlm_args *args)
3345 {
3346         struct dlm_rsb *r;
3347         int error;
3348
3349         r = lkb->lkb_resource;
3350
3351         hold_rsb(r);
3352         lock_rsb(r);
3353
3354         error = validate_lock_args(ls, lkb, args);
3355         if (error)
3356                 goto out;
3357
3358         error = _convert_lock(r, lkb);
3359  out:
3360         unlock_rsb(r);
3361         put_rsb(r);
3362         return error;
3363 }
3364
3365 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3366                        struct dlm_args *args)
3367 {
3368         struct dlm_rsb *r;
3369         int error;
3370
3371         r = lkb->lkb_resource;
3372
3373         hold_rsb(r);
3374         lock_rsb(r);
3375
3376         error = validate_unlock_args(lkb, args);
3377         if (error)
3378                 goto out;
3379
3380         error = _unlock_lock(r, lkb);
3381  out:
3382         unlock_rsb(r);
3383         put_rsb(r);
3384         return error;
3385 }
3386
3387 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3388                        struct dlm_args *args)
3389 {
3390         struct dlm_rsb *r;
3391         int error;
3392
3393         r = lkb->lkb_resource;
3394
3395         hold_rsb(r);
3396         lock_rsb(r);
3397
3398         error = validate_unlock_args(lkb, args);
3399         if (error)
3400                 goto out;
3401
3402         error = _cancel_lock(r, lkb);
3403  out:
3404         unlock_rsb(r);
3405         put_rsb(r);
3406         return error;
3407 }
3408
3409 /*
3410  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3411  */
3412
3413 int dlm_lock(dlm_lockspace_t *lockspace,
3414              int mode,
3415              struct dlm_lksb *lksb,
3416              uint32_t flags,
3417              void *name,
3418              unsigned int namelen,
3419              uint32_t parent_lkid,
3420              void (*ast) (void *astarg),
3421              void *astarg,
3422              void (*bast) (void *astarg, int mode))
3423 {
3424         struct dlm_ls *ls;
3425         struct dlm_lkb *lkb;
3426         struct dlm_args args;
3427         int error, convert = flags & DLM_LKF_CONVERT;
3428
3429         ls = dlm_find_lockspace_local(lockspace);
3430         if (!ls)
3431                 return -EINVAL;
3432
3433         dlm_lock_recovery(ls);
3434
3435         if (convert)
3436                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3437         else
3438                 error = create_lkb(ls, &lkb);
3439
3440         if (error)
3441                 goto out;
3442
3443         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3444                               astarg, bast, &args);
3445         if (error)
3446                 goto out_put;
3447
3448         if (convert)
3449                 error = convert_lock(ls, lkb, &args);
3450         else
3451                 error = request_lock(ls, lkb, name, namelen, &args);
3452
3453         if (error == -EINPROGRESS)
3454                 error = 0;
3455  out_put:
3456         if (convert || error)
3457                 __put_lkb(ls, lkb);
3458         if (error == -EAGAIN || error == -EDEADLK)
3459                 error = 0;
3460  out:
3461         dlm_unlock_recovery(ls);
3462         dlm_put_lockspace(ls);
3463         return error;
3464 }
3465
3466 int dlm_unlock(dlm_lockspace_t *lockspace,
3467                uint32_t lkid,
3468                uint32_t flags,
3469                struct dlm_lksb *lksb,
3470                void *astarg)
3471 {
3472         struct dlm_ls *ls;
3473         struct dlm_lkb *lkb;
3474         struct dlm_args args;
3475         int error;
3476
3477         ls = dlm_find_lockspace_local(lockspace);
3478         if (!ls)
3479                 return -EINVAL;
3480
3481         dlm_lock_recovery(ls);
3482
3483         error = find_lkb(ls, lkid, &lkb);
3484         if (error)
3485                 goto out;
3486
3487         error = set_unlock_args(flags, astarg, &args);
3488         if (error)
3489                 goto out_put;
3490
3491         if (flags & DLM_LKF_CANCEL)
3492                 error = cancel_lock(ls, lkb, &args);
3493         else
3494                 error = unlock_lock(ls, lkb, &args);
3495
3496         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3497                 error = 0;
3498         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3499                 error = 0;
3500  out_put:
3501         dlm_put_lkb(lkb);
3502  out:
3503         dlm_unlock_recovery(ls);
3504         dlm_put_lockspace(ls);
3505         return error;
3506 }
3507
3508 /*
3509  * send/receive routines for remote operations and replies
3510  *
3511  * send_args
3512  * send_common
3513  * send_request                 receive_request
3514  * send_convert                 receive_convert
3515  * send_unlock                  receive_unlock
3516  * send_cancel                  receive_cancel
3517  * send_grant                   receive_grant
3518  * send_bast                    receive_bast
3519  * send_lookup                  receive_lookup
3520  * send_remove                  receive_remove
3521  *
3522  *                              send_common_reply
3523  * receive_request_reply        send_request_reply
3524  * receive_convert_reply        send_convert_reply
3525  * receive_unlock_reply         send_unlock_reply
3526  * receive_cancel_reply         send_cancel_reply
3527  * receive_lookup_reply         send_lookup_reply
3528  */
3529
3530 static int _create_message(struct dlm_ls *ls, int mb_len,
3531                            int to_nodeid, int mstype,
3532                            struct dlm_message **ms_ret,
3533                            struct dlm_mhandle **mh_ret)
3534 {
3535         struct dlm_message *ms;
3536         struct dlm_mhandle *mh;
3537         char *mb;
3538
3539         /* get_buffer gives us a message handle (mh) that we need to
3540            pass into lowcomms_commit and a message buffer (mb) that we
3541            write our data into */
3542
3543         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
3544         if (!mh)
3545                 return -ENOBUFS;
3546
3547         memset(mb, 0, mb_len);
3548
3549         ms = (struct dlm_message *) mb;
3550
3551         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3552         ms->m_header.h_lockspace = ls->ls_global_id;
3553         ms->m_header.h_nodeid = dlm_our_nodeid();
3554         ms->m_header.h_length = mb_len;
3555         ms->m_header.h_cmd = DLM_MSG;
3556
3557         ms->m_type = mstype;
3558
3559         *mh_ret = mh;
3560         *ms_ret = ms;
3561         return 0;
3562 }
3563
3564 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3565                           int to_nodeid, int mstype,
3566                           struct dlm_message **ms_ret,
3567                           struct dlm_mhandle **mh_ret)
3568 {
3569         int mb_len = sizeof(struct dlm_message);
3570
3571         switch (mstype) {
3572         case DLM_MSG_REQUEST:
3573         case DLM_MSG_LOOKUP:
3574         case DLM_MSG_REMOVE:
3575                 mb_len += r->res_length;
3576                 break;
3577         case DLM_MSG_CONVERT:
3578         case DLM_MSG_UNLOCK:
3579         case DLM_MSG_REQUEST_REPLY:
3580         case DLM_MSG_CONVERT_REPLY:
3581         case DLM_MSG_GRANT:
3582                 if (lkb && lkb->lkb_lvbptr)
3583                         mb_len += r->res_ls->ls_lvblen;
3584                 break;
3585         }
3586
3587         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3588                                ms_ret, mh_ret);
3589 }
3590
3591 /* further lowcomms enhancements or alternate implementations may make
3592    the return value from this function useful at some point */
3593
3594 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
3595 {
3596         dlm_message_out(ms);
3597         dlm_lowcomms_commit_buffer(mh);
3598         return 0;
3599 }
3600
3601 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3602                       struct dlm_message *ms)
3603 {
3604         ms->m_nodeid   = lkb->lkb_nodeid;
3605         ms->m_pid      = lkb->lkb_ownpid;
3606         ms->m_lkid     = lkb->lkb_id;
3607         ms->m_remid    = lkb->lkb_remid;
3608         ms->m_exflags  = lkb->lkb_exflags;
3609         ms->m_sbflags  = lkb->lkb_sbflags;
3610         ms->m_flags    = lkb->lkb_flags;
3611         ms->m_lvbseq   = lkb->lkb_lvbseq;
3612         ms->m_status   = lkb->lkb_status;
3613         ms->m_grmode   = lkb->lkb_grmode;
3614         ms->m_rqmode   = lkb->lkb_rqmode;
3615         ms->m_hash     = r->res_hash;
3616
3617         /* m_result and m_bastmode are set from function args,
3618            not from lkb fields */
3619
3620         if (lkb->lkb_bastfn)
3621                 ms->m_asts |= DLM_CB_BAST;
3622         if (lkb->lkb_astfn)
3623                 ms->m_asts |= DLM_CB_CAST;
3624
3625         /* compare with switch in create_message; send_remove() doesn't
3626            use send_args() */
3627
3628         switch (ms->m_type) {
3629         case DLM_MSG_REQUEST:
3630         case DLM_MSG_LOOKUP:
3631                 memcpy(ms->m_extra, r->res_name, r->res_length);
3632                 break;
3633         case DLM_MSG_CONVERT:
3634         case DLM_MSG_UNLOCK:
3635         case DLM_MSG_REQUEST_REPLY:
3636         case DLM_MSG_CONVERT_REPLY:
3637         case DLM_MSG_GRANT:
3638                 if (!lkb->lkb_lvbptr)
3639                         break;
3640                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3641                 break;
3642         }
3643 }
3644
3645 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3646 {
3647         struct dlm_message *ms;
3648         struct dlm_mhandle *mh;
3649         int to_nodeid, error;
3650
3651         to_nodeid = r->res_nodeid;
3652
3653         error = add_to_waiters(lkb, mstype, to_nodeid);
3654         if (error)
3655                 return error;
3656
3657         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3658         if (error)
3659                 goto fail;
3660
3661         send_args(r, lkb, ms);
3662
3663         error = send_message(mh, ms);
3664         if (error)
3665                 goto fail;
3666         return 0;
3667
3668  fail:
3669         remove_from_waiters(lkb, msg_reply_type(mstype));
3670         return error;
3671 }
3672
3673 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3674 {
3675         return send_common(r, lkb, DLM_MSG_REQUEST);
3676 }
3677
3678 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3679 {
3680         int error;
3681
3682         error = send_common(r, lkb, DLM_MSG_CONVERT);
3683
3684         /* down conversions go without a reply from the master */
3685         if (!error && down_conversion(lkb)) {
3686                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3687                 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
3688                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3689                 r->res_ls->ls_stub_ms.m_result = 0;
3690                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3691         }
3692
3693         return error;
3694 }
3695
3696 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3697    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3698    that the master is still correct. */
3699
3700 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3701 {
3702         return send_common(r, lkb, DLM_MSG_UNLOCK);
3703 }
3704
3705 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3706 {
3707         return send_common(r, lkb, DLM_MSG_CANCEL);
3708 }
3709
3710 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3711 {
3712         struct dlm_message *ms;
3713         struct dlm_mhandle *mh;
3714         int to_nodeid, error;
3715
3716         to_nodeid = lkb->lkb_nodeid;
3717
3718         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3719         if (error)
3720                 goto out;
3721
3722         send_args(r, lkb, ms);
3723
3724         ms->m_result = 0;
3725
3726         error = send_message(mh, ms);
3727  out:
3728         return error;
3729 }
3730
3731 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3732 {
3733         struct dlm_message *ms;
3734         struct dlm_mhandle *mh;
3735         int to_nodeid, error;
3736
3737         to_nodeid = lkb->lkb_nodeid;
3738
3739         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3740         if (error)
3741                 goto out;
3742
3743         send_args(r, lkb, ms);
3744
3745         ms->m_bastmode = mode;
3746
3747         error = send_message(mh, ms);
3748  out:
3749         return error;
3750 }
3751
3752 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3753 {
3754         struct dlm_message *ms;
3755         struct dlm_mhandle *mh;
3756         int to_nodeid, error;
3757
3758         to_nodeid = dlm_dir_nodeid(r);
3759
3760         error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3761         if (error)
3762                 return error;
3763
3764         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3765         if (error)
3766                 goto fail;
3767
3768         send_args(r, lkb, ms);
3769
3770         error = send_message(mh, ms);
3771         if (error)
3772                 goto fail;
3773         return 0;
3774
3775  fail:
3776         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3777         return error;
3778 }
3779
3780 static int send_remove(struct dlm_rsb *r)
3781 {
3782         struct dlm_message *ms;
3783         struct dlm_mhandle *mh;
3784         int to_nodeid, error;
3785
3786         to_nodeid = dlm_dir_nodeid(r);
3787
3788         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3789         if (error)
3790                 goto out;
3791
3792         memcpy(ms->m_extra, r->res_name, r->res_length);
3793         ms->m_hash = r->res_hash;
3794
3795         error = send_message(mh, ms);
3796  out:
3797         return error;
3798 }
3799
3800 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3801                              int mstype, int rv)
3802 {
3803         struct dlm_message *ms;
3804         struct dlm_mhandle *mh;
3805         int to_nodeid, error;
3806
3807         to_nodeid = lkb->lkb_nodeid;
3808
3809         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3810         if (error)
3811                 goto out;
3812
3813         send_args(r, lkb, ms);
3814
3815         ms->m_result = rv;
3816
3817         error = send_message(mh, ms);
3818  out:
3819         return error;
3820 }
3821
3822 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3823 {
3824         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3825 }
3826
3827 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3828 {
3829         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3830 }
3831
3832 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3833 {
3834         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3835 }
3836
3837 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3838 {
3839         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3840 }
3841
3842 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3843                              int ret_nodeid, int rv)
3844 {
3845         struct dlm_rsb *r = &ls->ls_stub_rsb;
3846         struct dlm_message *ms;
3847         struct dlm_mhandle *mh;
3848         int error, nodeid = ms_in->m_header.h_nodeid;
3849
3850         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3851         if (error)
3852                 goto out;
3853
3854         ms->m_lkid = ms_in->m_lkid;
3855         ms->m_result = rv;
3856         ms->m_nodeid = ret_nodeid;
3857
3858         error = send_message(mh, ms);
3859  out:
3860         return error;
3861 }
3862
3863 /* which args we save from a received message depends heavily on the type
3864    of message, unlike the send side where we can safely send everything about
3865    the lkb for any type of message */
3866
3867 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3868 {
3869         lkb->lkb_exflags = ms->m_exflags;
3870         lkb->lkb_sbflags = ms->m_sbflags;
3871         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3872                          (ms->m_flags & 0x0000FFFF);
3873 }
3874
3875 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3876 {
3877         if (ms->m_flags == DLM_IFL_STUB_MS)
3878                 return;
3879
3880         lkb->lkb_sbflags = ms->m_sbflags;
3881         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3882                          (ms->m_flags & 0x0000FFFF);
3883 }
3884
3885 static int receive_extralen(struct dlm_message *ms)
3886 {
3887         return (ms->m_header.h_length - sizeof(struct dlm_message));
3888 }
3889
3890 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3891                        struct dlm_message *ms)
3892 {
3893         int len;
3894
3895         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3896                 if (!lkb->lkb_lvbptr)
3897                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3898                 if (!lkb->lkb_lvbptr)
3899                         return -ENOMEM;
3900                 len = receive_extralen(ms);
3901                 if (len > ls->ls_lvblen)
3902                         len = ls->ls_lvblen;
3903                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3904         }
3905         return 0;
3906 }
3907
3908 static void fake_bastfn(void *astparam, int mode)
3909 {
3910         log_print("fake_bastfn should not be called");
3911 }
3912
3913 static void fake_astfn(void *astparam)
3914 {
3915         log_print("fake_astfn should not be called");
3916 }
3917
3918 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3919                                 struct dlm_message *ms)
3920 {
3921         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3922         lkb->lkb_ownpid = ms->m_pid;
3923         lkb->lkb_remid = ms->m_lkid;
3924         lkb->lkb_grmode = DLM_LOCK_IV;
3925         lkb->lkb_rqmode = ms->m_rqmode;
3926
3927         lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3928         lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3929
3930         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3931                 /* lkb was just created so there won't be an lvb yet */
3932                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3933                 if (!lkb->lkb_lvbptr)
3934                         return -ENOMEM;
3935         }
3936
3937         return 0;
3938 }
3939
3940 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3941                                 struct dlm_message *ms)
3942 {
3943         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3944                 return -EBUSY;
3945
3946         if (receive_lvb(ls, lkb, ms))
3947                 return -ENOMEM;
3948
3949         lkb->lkb_rqmode = ms->m_rqmode;
3950         lkb->lkb_lvbseq = ms->m_lvbseq;
3951
3952         return 0;
3953 }
3954
3955 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3956                                struct dlm_message *ms)
3957 {
3958         if (receive_lvb(ls, lkb, ms))
3959                 return -ENOMEM;
3960         return 0;
3961 }
3962
3963 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3964    uses to send a reply and that the remote end uses to process the reply. */
3965
3966 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3967 {
3968         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3969         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3970         lkb->lkb_remid = ms->m_lkid;
3971 }
3972
3973 /* This is called after the rsb is locked so that we can safely inspect
3974    fields in the lkb. */
3975
3976 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3977 {
3978         int from = ms->m_header.h_nodeid;
3979         int error = 0;
3980
3981         /* currently mixing of user/kernel locks are not supported */
3982         if (ms->m_flags & DLM_IFL_USER && ~lkb->lkb_flags & DLM_IFL_USER) {
3983                 log_error(lkb->lkb_resource->res_ls,
3984                           "got user dlm message for a kernel lock");
3985                 error = -EINVAL;
3986                 goto out;
3987         }
3988
3989         switch (ms->m_type) {
3990         case DLM_MSG_CONVERT:
3991         case DLM_MSG_UNLOCK:
3992         case DLM_MSG_CANCEL:
3993                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3994                         error = -EINVAL;
3995                 break;
3996
3997         case DLM_MSG_CONVERT_REPLY:
3998         case DLM_MSG_UNLOCK_REPLY:
3999         case DLM_MSG_CANCEL_REPLY:
4000         case DLM_MSG_GRANT:
4001         case DLM_MSG_BAST:
4002                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
4003                         error = -EINVAL;
4004                 break;
4005
4006         case DLM_MSG_REQUEST_REPLY:
4007                 if (!is_process_copy(lkb))
4008                         error = -EINVAL;
4009                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
4010                         error = -EINVAL;
4011                 break;
4012
4013         default:
4014                 error = -EINVAL;
4015         }
4016
4017 out:
4018         if (error)
4019                 log_error(lkb->lkb_resource->res_ls,
4020                           "ignore invalid message %d from %d %x %x %x %d",
4021                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
4022                           lkb->lkb_flags, lkb->lkb_nodeid);
4023         return error;
4024 }
4025
4026 static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
4027 {
4028         char name[DLM_RESNAME_MAXLEN + 1];
4029         struct dlm_message *ms;
4030         struct dlm_mhandle *mh;
4031         struct dlm_rsb *r;
4032         uint32_t hash, b;
4033         int rv, dir_nodeid;
4034
4035         memset(name, 0, sizeof(name));
4036         memcpy(name, ms_name, len);
4037
4038         hash = jhash(name, len, 0);
4039         b = hash & (ls->ls_rsbtbl_size - 1);
4040
4041         dir_nodeid = dlm_hash2nodeid(ls, hash);
4042
4043         log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name);
4044
4045         spin_lock(&ls->ls_rsbtbl[b].lock);
4046         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4047         if (!rv) {
4048                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4049                 log_error(ls, "repeat_remove on keep %s", name);
4050                 return;
4051         }
4052
4053         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4054         if (!rv) {
4055                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4056                 log_error(ls, "repeat_remove on toss %s", name);
4057                 return;
4058         }
4059
4060         /* use ls->remove_name2 to avoid conflict with shrink? */
4061
4062         spin_lock(&ls->ls_remove_spin);
4063         ls->ls_remove_len = len;
4064         memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
4065         spin_unlock(&ls->ls_remove_spin);
4066         spin_unlock(&ls->ls_rsbtbl[b].lock);
4067
4068         rv = _create_message(ls, sizeof(struct dlm_message) + len,
4069                              dir_nodeid, DLM_MSG_REMOVE, &ms, &mh);
4070         if (rv)
4071                 goto out;
4072
4073         memcpy(ms->m_extra, name, len);
4074         ms->m_hash = hash;
4075
4076         send_message(mh, ms);
4077
4078 out:
4079         spin_lock(&ls->ls_remove_spin);
4080         ls->ls_remove_len = 0;
4081         memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
4082         spin_unlock(&ls->ls_remove_spin);
4083 }
4084
4085 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
4086 {
4087         struct dlm_lkb *lkb;
4088         struct dlm_rsb *r;
4089         int from_nodeid;
4090         int error, namelen = 0;
4091
4092         from_nodeid = ms->m_header.h_nodeid;
4093
4094         error = create_lkb(ls, &lkb);
4095         if (error)
4096                 goto fail;
4097
4098         receive_flags(lkb, ms);
4099         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4100         error = receive_request_args(ls, lkb, ms);
4101         if (error) {
4102                 __put_lkb(ls, lkb);
4103                 goto fail;
4104         }
4105
4106         /* The dir node is the authority on whether we are the master
4107            for this rsb or not, so if the master sends us a request, we should
4108            recreate the rsb if we've destroyed it.   This race happens when we
4109            send a remove message to the dir node at the same time that the dir
4110            node sends us a request for the rsb. */
4111
4112         namelen = receive_extralen(ms);
4113
4114         error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4115                          R_RECEIVE_REQUEST, &r);
4116         if (error) {
4117                 __put_lkb(ls, lkb);
4118                 goto fail;
4119         }
4120
4121         lock_rsb(r);
4122
4123         if (r->res_master_nodeid != dlm_our_nodeid()) {
4124                 error = validate_master_nodeid(ls, r, from_nodeid);
4125                 if (error) {
4126                         unlock_rsb(r);
4127                         put_rsb(r);
4128                         __put_lkb(ls, lkb);
4129                         goto fail;
4130                 }
4131         }
4132
4133         attach_lkb(r, lkb);
4134         error = do_request(r, lkb);
4135         send_request_reply(r, lkb, error);
4136         do_request_effects(r, lkb, error);
4137
4138         unlock_rsb(r);
4139         put_rsb(r);
4140
4141         if (error == -EINPROGRESS)
4142                 error = 0;
4143         if (error)
4144                 dlm_put_lkb(lkb);
4145         return 0;
4146
4147  fail:
4148         /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4149            and do this receive_request again from process_lookup_list once
4150            we get the lookup reply.  This would avoid a many repeated
4151            ENOTBLK request failures when the lookup reply designating us
4152            as master is delayed. */
4153
4154         /* We could repeatedly return -EBADR here if our send_remove() is
4155            delayed in being sent/arriving/being processed on the dir node.
4156            Another node would repeatedly lookup up the master, and the dir
4157            node would continue returning our nodeid until our send_remove
4158            took effect.
4159
4160            We send another remove message in case our previous send_remove
4161            was lost/ignored/missed somehow. */
4162
4163         if (error != -ENOTBLK) {
4164                 log_limit(ls, "receive_request %x from %d %d",
4165                           ms->m_lkid, from_nodeid, error);
4166         }
4167
4168         if (namelen && error == -EBADR) {
4169                 send_repeat_remove(ls, ms->m_extra, namelen);
4170                 msleep(1000);
4171         }
4172
4173         setup_stub_lkb(ls, ms);
4174         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4175         return error;
4176 }
4177
4178 static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
4179 {
4180         struct dlm_lkb *lkb;
4181         struct dlm_rsb *r;
4182         int error, reply = 1;
4183
4184         error = find_lkb(ls, ms->m_remid, &lkb);
4185         if (error)
4186                 goto fail;
4187
4188         if (lkb->lkb_remid != ms->m_lkid) {
4189                 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4190                           "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4191                           (unsigned long long)lkb->lkb_recover_seq,
4192                           ms->m_header.h_nodeid, ms->m_lkid);
4193                 error = -ENOENT;
4194                 dlm_put_lkb(lkb);
4195                 goto fail;
4196         }
4197
4198         r = lkb->lkb_resource;
4199
4200         hold_rsb(r);
4201         lock_rsb(r);
4202
4203         error = validate_message(lkb, ms);
4204         if (error)
4205                 goto out;
4206
4207         receive_flags(lkb, ms);
4208
4209         error = receive_convert_args(ls, lkb, ms);
4210         if (error) {
4211                 send_convert_reply(r, lkb, error);
4212                 goto out;
4213         }
4214
4215         reply = !down_conversion(lkb);
4216
4217         error = do_convert(r, lkb);
4218         if (reply)
4219                 send_convert_reply(r, lkb, error);
4220         do_convert_effects(r, lkb, error);
4221  out:
4222         unlock_rsb(r);
4223         put_rsb(r);
4224         dlm_put_lkb(lkb);
4225         return 0;
4226
4227  fail:
4228         setup_stub_lkb(ls, ms);
4229         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4230         return error;
4231 }
4232
4233 static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
4234 {
4235         struct dlm_lkb *lkb;
4236         struct dlm_rsb *r;
4237         int error;
4238
4239         error = find_lkb(ls, ms->m_remid, &lkb);
4240         if (error)
4241                 goto fail;
4242
4243         if (lkb->lkb_remid != ms->m_lkid) {
4244                 log_error(ls, "receive_unlock %x remid %x remote %d %x",
4245                           lkb->lkb_id, lkb->lkb_remid,
4246                           ms->m_header.h_nodeid, ms->m_lkid);
4247                 error = -ENOENT;
4248                 dlm_put_lkb(lkb);
4249                 goto fail;
4250         }
4251
4252         r = lkb->lkb_resource;
4253
4254         hold_rsb(r);
4255         lock_rsb(r);
4256
4257         error = validate_message(lkb, ms);
4258         if (error)
4259                 goto out;
4260
4261         receive_flags(lkb, ms);
4262
4263         error = receive_unlock_args(ls, lkb, ms);
4264         if (error) {
4265                 send_unlock_reply(r, lkb, error);
4266                 goto out;
4267         }
4268
4269         error = do_unlock(r, lkb);
4270         send_unlock_reply(r, lkb, error);
4271         do_unlock_effects(r, lkb, error);
4272  out:
4273         unlock_rsb(r);
4274         put_rsb(r);
4275         dlm_put_lkb(lkb);
4276         return 0;
4277
4278  fail:
4279         setup_stub_lkb(ls, ms);
4280         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4281         return error;
4282 }
4283
4284 static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
4285 {
4286         struct dlm_lkb *lkb;
4287         struct dlm_rsb *r;
4288         int error;
4289
4290         error = find_lkb(ls, ms->m_remid, &lkb);
4291         if (error)
4292                 goto fail;
4293
4294         receive_flags(lkb, ms);
4295
4296         r = lkb->lkb_resource;
4297
4298         hold_rsb(r);
4299         lock_rsb(r);
4300
4301         error = validate_message(lkb, ms);
4302         if (error)
4303                 goto out;
4304
4305         error = do_cancel(r, lkb);
4306         send_cancel_reply(r, lkb, error);
4307         do_cancel_effects(r, lkb, error);
4308  out:
4309         unlock_rsb(r);
4310         put_rsb(r);
4311         dlm_put_lkb(lkb);
4312         return 0;
4313
4314  fail:
4315         setup_stub_lkb(ls, ms);
4316         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4317         return error;
4318 }
4319
4320 static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
4321 {
4322         struct dlm_lkb *lkb;
4323         struct dlm_rsb *r;
4324         int error;
4325
4326         error = find_lkb(ls, ms->m_remid, &lkb);
4327         if (error)
4328                 return error;
4329
4330         r = lkb->lkb_resource;
4331
4332         hold_rsb(r);
4333         lock_rsb(r);
4334
4335         error = validate_message(lkb, ms);
4336         if (error)
4337                 goto out;
4338
4339         receive_flags_reply(lkb, ms);
4340         if (is_altmode(lkb))
4341                 munge_altmode(lkb, ms);
4342         grant_lock_pc(r, lkb, ms);
4343         queue_cast(r, lkb, 0);
4344  out:
4345         unlock_rsb(r);
4346         put_rsb(r);
4347         dlm_put_lkb(lkb);
4348         return 0;
4349 }
4350
4351 static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
4352 {
4353         struct dlm_lkb *lkb;
4354         struct dlm_rsb *r;
4355         int error;
4356
4357         error = find_lkb(ls, ms->m_remid, &lkb);
4358         if (error)
4359                 return error;
4360
4361         r = lkb->lkb_resource;
4362
4363         hold_rsb(r);
4364         lock_rsb(r);
4365
4366         error = validate_message(lkb, ms);
4367         if (error)
4368                 goto out;
4369
4370         queue_bast(r, lkb, ms->m_bastmode);
4371         lkb->lkb_highbast = ms->m_bastmode;
4372  out:
4373         unlock_rsb(r);
4374         put_rsb(r);
4375         dlm_put_lkb(lkb);
4376         return 0;
4377 }
4378
4379 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
4380 {
4381         int len, error, ret_nodeid, from_nodeid, our_nodeid;
4382
4383         from_nodeid = ms->m_header.h_nodeid;
4384         our_nodeid = dlm_our_nodeid();
4385
4386         len = receive_extralen(ms);
4387
4388         error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4389                                   &ret_nodeid, NULL);
4390
4391         /* Optimization: we're master so treat lookup as a request */
4392         if (!error && ret_nodeid == our_nodeid) {
4393                 receive_request(ls, ms);
4394                 return;
4395         }
4396         send_lookup_reply(ls, ms, ret_nodeid, error);
4397 }
4398
4399 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
4400 {
4401         char name[DLM_RESNAME_MAXLEN+1];
4402         struct dlm_rsb *r;
4403         uint32_t hash, b;
4404         int rv, len, dir_nodeid, from_nodeid;
4405
4406         from_nodeid = ms->m_header.h_nodeid;
4407
4408         len = receive_extralen(ms);
4409
4410         if (len > DLM_RESNAME_MAXLEN) {
4411                 log_error(ls, "receive_remove from %d bad len %d",
4412                           from_nodeid, len);
4413                 return;
4414         }
4415
4416         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
4417         if (dir_nodeid != dlm_our_nodeid()) {
4418                 log_error(ls, "receive_remove from %d bad nodeid %d",
4419                           from_nodeid, dir_nodeid);
4420                 return;
4421         }
4422
4423         /* Look for name on rsbtbl.toss, if it's there, kill it.
4424            If it's on rsbtbl.keep, it's being used, and we should ignore this
4425            message.  This is an expected race between the dir node sending a
4426            request to the master node at the same time as the master node sends
4427            a remove to the dir node.  The resolution to that race is for the
4428            dir node to ignore the remove message, and the master node to
4429            recreate the master rsb when it gets a request from the dir node for
4430            an rsb it doesn't have. */
4431
4432         memset(name, 0, sizeof(name));
4433         memcpy(name, ms->m_extra, len);
4434
4435         hash = jhash(name, len, 0);
4436         b = hash & (ls->ls_rsbtbl_size - 1);
4437
4438         spin_lock(&ls->ls_rsbtbl[b].lock);
4439
4440         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4441         if (rv) {
4442                 /* verify the rsb is on keep list per comment above */
4443                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4444                 if (rv) {
4445                         /* should not happen */
4446                         log_error(ls, "receive_remove from %d not found %s",
4447                                   from_nodeid, name);
4448                         spin_unlock(&ls->ls_rsbtbl[b].lock);
4449                         return;
4450                 }
4451                 if (r->res_master_nodeid != from_nodeid) {
4452                         /* should not happen */
4453                         log_error(ls, "receive_remove keep from %d master %d",
4454                                   from_nodeid, r->res_master_nodeid);
4455                         dlm_print_rsb(r);
4456                         spin_unlock(&ls->ls_rsbtbl[b].lock);
4457                         return;
4458                 }
4459
4460                 log_debug(ls, "receive_remove from %d master %d first %x %s",
4461                           from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4462                           name);
4463                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4464                 return;
4465         }
4466
4467         if (r->res_master_nodeid != from_nodeid) {
4468                 log_error(ls, "receive_remove toss from %d master %d",
4469                           from_nodeid, r->res_master_nodeid);
4470                 dlm_print_rsb(r);
4471                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4472                 return;
4473         }
4474
4475         if (kref_put(&r->res_ref, kill_rsb)) {
4476                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4477                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4478                 dlm_free_rsb(r);
4479         } else {
4480                 log_error(ls, "receive_remove from %d rsb ref error",
4481                           from_nodeid);
4482                 dlm_print_rsb(r);
4483                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4484         }
4485 }
4486
4487 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
4488 {
4489         do_purge(ls, ms->m_nodeid, ms->m_pid);
4490 }
4491
4492 static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
4493 {
4494         struct dlm_lkb *lkb;
4495         struct dlm_rsb *r;
4496         int error, mstype, result;
4497         int from_nodeid = ms->m_header.h_nodeid;
4498
4499         error = find_lkb(ls, ms->m_remid, &lkb);
4500         if (error)
4501                 return error;
4502
4503         r = lkb->lkb_resource;
4504         hold_rsb(r);
4505         lock_rsb(r);
4506
4507         error = validate_message(lkb, ms);
4508         if (error)
4509                 goto out;
4510
4511         mstype = lkb->lkb_wait_type;
4512         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4513         if (error) {
4514                 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4515                           lkb->lkb_id, from_nodeid, ms->m_lkid, ms->m_result);
4516                 dlm_dump_rsb(r);
4517                 goto out;
4518         }
4519
4520         /* Optimization: the dir node was also the master, so it took our
4521            lookup as a request and sent request reply instead of lookup reply */
4522         if (mstype == DLM_MSG_LOOKUP) {
4523                 r->res_master_nodeid = from_nodeid;
4524                 r->res_nodeid = from_nodeid;
4525                 lkb->lkb_nodeid = from_nodeid;
4526         }
4527
4528         /* this is the value returned from do_request() on the master */
4529         result = ms->m_result;
4530
4531         switch (result) {
4532         case -EAGAIN:
4533                 /* request would block (be queued) on remote master */
4534                 queue_cast(r, lkb, -EAGAIN);
4535                 confirm_master(r, -EAGAIN);
4536                 unhold_lkb(lkb); /* undoes create_lkb() */
4537                 break;
4538
4539         case -EINPROGRESS:
4540         case 0:
4541                 /* request was queued or granted on remote master */
4542                 receive_flags_reply(lkb, ms);
4543                 lkb->lkb_remid = ms->m_lkid;
4544                 if (is_altmode(lkb))
4545                         munge_altmode(lkb, ms);
4546                 if (result) {
4547                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
4548                         add_timeout(lkb);
4549                 } else {
4550                         grant_lock_pc(r, lkb, ms);
4551                         queue_cast(r, lkb, 0);
4552                 }
4553                 confirm_master(r, result);
4554                 break;
4555
4556         case -EBADR:
4557         case -ENOTBLK:
4558                 /* find_rsb failed to find rsb or rsb wasn't master */
4559                 log_limit(ls, "receive_request_reply %x from %d %d "
4560                           "master %d dir %d first %x %s", lkb->lkb_id,
4561                           from_nodeid, result, r->res_master_nodeid,
4562                           r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4563
4564                 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4565                     r->res_master_nodeid != dlm_our_nodeid()) {
4566                         /* cause _request_lock->set_master->send_lookup */
4567                         r->res_master_nodeid = 0;
4568                         r->res_nodeid = -1;
4569                         lkb->lkb_nodeid = -1;
4570                 }
4571
4572                 if (is_overlap(lkb)) {
4573                         /* we'll ignore error in cancel/unlock reply */
4574                         queue_cast_overlap(r, lkb);
4575                         confirm_master(r, result);
4576                         unhold_lkb(lkb); /* undoes create_lkb() */
4577                 } else {
4578                         _request_lock(r, lkb);
4579
4580                         if (r->res_master_nodeid == dlm_our_nodeid())
4581                                 confirm_master(r, 0);
4582                 }
4583                 break;
4584
4585         default:
4586                 log_error(ls, "receive_request_reply %x error %d",
4587                           lkb->lkb_id, result);
4588         }
4589
4590         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
4591                 log_debug(ls, "receive_request_reply %x result %d unlock",
4592                           lkb->lkb_id, result);
4593                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4594                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4595                 send_unlock(r, lkb);
4596         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
4597                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4598                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4599                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4600                 send_cancel(r, lkb);
4601         } else {
4602                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4603                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4604         }
4605  out:
4606         unlock_rsb(r);
4607         put_rsb(r);
4608         dlm_put_lkb(lkb);
4609         return 0;
4610 }
4611
4612 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4613                                     struct dlm_message *ms)
4614 {
4615         /* this is the value returned from do_convert() on the master */
4616         switch (ms->m_result) {
4617         case -EAGAIN:
4618                 /* convert would block (be queued) on remote master */
4619                 queue_cast(r, lkb, -EAGAIN);
4620                 break;
4621
4622         case -EDEADLK:
4623                 receive_flags_reply(lkb, ms);
4624                 revert_lock_pc(r, lkb);
4625                 queue_cast(r, lkb, -EDEADLK);
4626                 break;
4627
4628         case -EINPROGRESS:
4629                 /* convert was queued on remote master */
4630                 receive_flags_reply(lkb, ms);
4631                 if (is_demoted(lkb))
4632                         munge_demoted(lkb);
4633                 del_lkb(r, lkb);
4634                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4635                 add_timeout(lkb);
4636                 break;
4637
4638         case 0:
4639                 /* convert was granted on remote master */
4640                 receive_flags_reply(lkb, ms);
4641                 if (is_demoted(lkb))
4642                         munge_demoted(lkb);
4643                 grant_lock_pc(r, lkb, ms);
4644                 queue_cast(r, lkb, 0);
4645                 break;
4646
4647         default:
4648                 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4649                           lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
4650                           ms->m_result);
4651                 dlm_print_rsb(r);
4652                 dlm_print_lkb(lkb);
4653         }
4654 }
4655
4656 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4657 {
4658         struct dlm_rsb *r = lkb->lkb_resource;
4659         int error;
4660
4661         hold_rsb(r);
4662         lock_rsb(r);
4663
4664         error = validate_message(lkb, ms);
4665         if (error)
4666                 goto out;
4667
4668         /* stub reply can happen with waiters_mutex held */
4669         error = remove_from_waiters_ms(lkb, ms);
4670         if (error)
4671                 goto out;
4672
4673         __receive_convert_reply(r, lkb, ms);
4674  out:
4675         unlock_rsb(r);
4676         put_rsb(r);
4677 }
4678
4679 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
4680 {
4681         struct dlm_lkb *lkb;
4682         int error;
4683
4684         error = find_lkb(ls, ms->m_remid, &lkb);
4685         if (error)
4686                 return error;
4687
4688         _receive_convert_reply(lkb, ms);
4689         dlm_put_lkb(lkb);
4690         return 0;
4691 }
4692
4693 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4694 {
4695         struct dlm_rsb *r = lkb->lkb_resource;
4696         int error;
4697
4698         hold_rsb(r);
4699         lock_rsb(r);
4700
4701         error = validate_message(lkb, ms);
4702         if (error)
4703                 goto out;
4704
4705         /* stub reply can happen with waiters_mutex held */
4706         error = remove_from_waiters_ms(lkb, ms);
4707         if (error)
4708                 goto out;
4709
4710         /* this is the value returned from do_unlock() on the master */
4711
4712         switch (ms->m_result) {
4713         case -DLM_EUNLOCK:
4714                 receive_flags_reply(lkb, ms);
4715                 remove_lock_pc(r, lkb);
4716                 queue_cast(r, lkb, -DLM_EUNLOCK);
4717                 break;
4718         case -ENOENT:
4719                 break;
4720         default:
4721                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4722                           lkb->lkb_id, ms->m_result);
4723         }
4724  out:
4725         unlock_rsb(r);
4726         put_rsb(r);
4727 }
4728
4729 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
4730 {
4731         struct dlm_lkb *lkb;
4732         int error;
4733
4734         error = find_lkb(ls, ms->m_remid, &lkb);
4735         if (error)
4736                 return error;
4737
4738         _receive_unlock_reply(lkb, ms);
4739         dlm_put_lkb(lkb);
4740         return 0;
4741 }
4742
4743 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4744 {
4745         struct dlm_rsb *r = lkb->lkb_resource;
4746         int error;
4747
4748         hold_rsb(r);
4749         lock_rsb(r);
4750
4751         error = validate_message(lkb, ms);
4752         if (error)
4753                 goto out;
4754
4755         /* stub reply can happen with waiters_mutex held */
4756         error = remove_from_waiters_ms(lkb, ms);
4757         if (error)
4758                 goto out;
4759
4760         /* this is the value returned from do_cancel() on the master */
4761
4762         switch (ms->m_result) {
4763         case -DLM_ECANCEL:
4764                 receive_flags_reply(lkb, ms);
4765                 revert_lock_pc(r, lkb);
4766                 queue_cast(r, lkb, -DLM_ECANCEL);
4767                 break;
4768         case 0:
4769                 break;
4770         default:
4771                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
4772                           lkb->lkb_id, ms->m_result);
4773         }
4774  out:
4775         unlock_rsb(r);
4776         put_rsb(r);
4777 }
4778
4779 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
4780 {
4781         struct dlm_lkb *lkb;
4782         int error;
4783
4784         error = find_lkb(ls, ms->m_remid, &lkb);
4785         if (error)
4786                 return error;
4787
4788         _receive_cancel_reply(lkb, ms);
4789         dlm_put_lkb(lkb);
4790         return 0;
4791 }
4792
4793 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4794 {
4795         struct dlm_lkb *lkb;
4796         struct dlm_rsb *r;
4797         int error, ret_nodeid;
4798         int do_lookup_list = 0;
4799
4800         error = find_lkb(ls, ms->m_lkid, &lkb);
4801         if (error) {
4802                 log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid);
4803                 return;
4804         }
4805
4806         /* ms->m_result is the value returned by dlm_master_lookup on dir node
4807            FIXME: will a non-zero error ever be returned? */
4808
4809         r = lkb->lkb_resource;
4810         hold_rsb(r);
4811         lock_rsb(r);
4812
4813         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4814         if (error)
4815                 goto out;
4816
4817         ret_nodeid = ms->m_nodeid;
4818
4819         /* We sometimes receive a request from the dir node for this
4820            rsb before we've received the dir node's loookup_reply for it.
4821            The request from the dir node implies we're the master, so we set
4822            ourself as master in receive_request_reply, and verify here that
4823            we are indeed the master. */
4824
4825         if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4826                 /* This should never happen */
4827                 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4828                           "master %d dir %d our %d first %x %s",
4829                           lkb->lkb_id, ms->m_header.h_nodeid, ret_nodeid,
4830                           r->res_master_nodeid, r->res_dir_nodeid,
4831                           dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4832         }
4833
4834         if (ret_nodeid == dlm_our_nodeid()) {
4835                 r->res_master_nodeid = ret_nodeid;
4836                 r->res_nodeid = 0;
4837                 do_lookup_list = 1;
4838                 r->res_first_lkid = 0;
4839         } else if (ret_nodeid == -1) {
4840                 /* the remote node doesn't believe it's the dir node */
4841                 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4842                           lkb->lkb_id, ms->m_header.h_nodeid);
4843                 r->res_master_nodeid = 0;
4844                 r->res_nodeid = -1;
4845                 lkb->lkb_nodeid = -1;
4846         } else {
4847                 /* set_master() will set lkb_nodeid from r */
4848                 r->res_master_nodeid = ret_nodeid;
4849                 r->res_nodeid = ret_nodeid;
4850         }
4851
4852         if (is_overlap(lkb)) {
4853                 log_debug(ls, "receive_lookup_reply %x unlock %x",
4854                           lkb->lkb_id, lkb->lkb_flags);
4855                 queue_cast_overlap(r, lkb);
4856                 unhold_lkb(lkb); /* undoes create_lkb() */
4857                 goto out_list;
4858         }
4859
4860         _request_lock(r, lkb);
4861
4862  out_list:
4863         if (do_lookup_list)
4864                 process_lookup_list(r);
4865  out:
4866         unlock_rsb(r);
4867         put_rsb(r);
4868         dlm_put_lkb(lkb);
4869 }
4870
4871 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4872                              uint32_t saved_seq)
4873 {
4874         int error = 0, noent = 0;
4875
4876         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
4877                 log_limit(ls, "receive %d from non-member %d %x %x %d",
4878                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
4879                           ms->m_remid, ms->m_result);
4880                 return;
4881         }
4882
4883         switch (ms->m_type) {
4884
4885         /* messages sent to a master node */
4886
4887         case DLM_MSG_REQUEST:
4888                 error = receive_request(ls, ms);
4889                 break;
4890
4891         case DLM_MSG_CONVERT:
4892                 error = receive_convert(ls, ms);
4893                 break;
4894
4895         case DLM_MSG_UNLOCK:
4896                 error = receive_unlock(ls, ms);
4897                 break;
4898
4899         case DLM_MSG_CANCEL:
4900                 noent = 1;
4901                 error = receive_cancel(ls, ms);
4902                 break;
4903
4904         /* messages sent from a master node (replies to above) */
4905
4906         case DLM_MSG_REQUEST_REPLY:
4907                 error = receive_request_reply(ls, ms);
4908                 break;
4909
4910         case DLM_MSG_CONVERT_REPLY:
4911                 error = receive_convert_reply(ls, ms);
4912                 break;
4913
4914         case DLM_MSG_UNLOCK_REPLY:
4915                 error = receive_unlock_reply(ls, ms);
4916                 break;
4917
4918         case DLM_MSG_CANCEL_REPLY:
4919                 error = receive_cancel_reply(ls, ms);
4920                 break;
4921
4922         /* messages sent from a master node (only two types of async msg) */
4923
4924         case DLM_MSG_GRANT:
4925                 noent = 1;
4926                 error = receive_grant(ls, ms);
4927                 break;
4928
4929         case DLM_MSG_BAST:
4930                 noent = 1;
4931                 error = receive_bast(ls, ms);
4932                 break;
4933
4934         /* messages sent to a dir node */
4935
4936         case DLM_MSG_LOOKUP:
4937                 receive_lookup(ls, ms);
4938                 break;
4939
4940         case DLM_MSG_REMOVE:
4941                 receive_remove(ls, ms);
4942                 break;
4943
4944         /* messages sent from a dir node (remove has no reply) */
4945
4946         case DLM_MSG_LOOKUP_REPLY:
4947                 receive_lookup_reply(ls, ms);
4948                 break;
4949
4950         /* other messages */
4951
4952         case DLM_MSG_PURGE:
4953                 receive_purge(ls, ms);
4954                 break;
4955
4956         default:
4957                 log_error(ls, "unknown message type %d", ms->m_type);
4958         }
4959
4960         /*
4961          * When checking for ENOENT, we're checking the result of
4962          * find_lkb(m_remid):
4963          *
4964          * The lock id referenced in the message wasn't found.  This may
4965          * happen in normal usage for the async messages and cancel, so
4966          * only use log_debug for them.
4967          *
4968          * Some errors are expected and normal.
4969          */
4970
4971         if (error == -ENOENT && noent) {
4972                 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4973                           ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4974                           ms->m_lkid, saved_seq);
4975         } else if (error == -ENOENT) {
4976                 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4977                           ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4978                           ms->m_lkid, saved_seq);
4979
4980                 if (ms->m_type == DLM_MSG_CONVERT)
4981                         dlm_dump_rsb_hash(ls, ms->m_hash);
4982         }
4983
4984         if (error == -EINVAL) {
4985                 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4986                           "saved_seq %u",
4987                           ms->m_type, ms->m_header.h_nodeid,
4988                           ms->m_lkid, ms->m_remid, saved_seq);
4989         }
4990 }
4991
4992 /* If the lockspace is in recovery mode (locking stopped), then normal
4993    messages are saved on the requestqueue for processing after recovery is
4994    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4995    messages off the requestqueue before we process new ones. This occurs right
4996    after recovery completes when we transition from saving all messages on
4997    requestqueue, to processing all the saved messages, to processing new
4998    messages as they arrive. */
4999
5000 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
5001                                 int nodeid)
5002 {
5003         if (dlm_locking_stopped(ls)) {
5004                 /* If we were a member of this lockspace, left, and rejoined,
5005                    other nodes may still be sending us messages from the
5006                    lockspace generation before we left. */
5007                 if (!ls->ls_generation) {
5008                         log_limit(ls, "receive %d from %d ignore old gen",
5009                                   ms->m_type, nodeid);
5010                         return;
5011                 }
5012
5013                 dlm_add_requestqueue(ls, nodeid, ms);
5014         } else {
5015                 dlm_wait_requestqueue(ls);
5016                 _receive_message(ls, ms, 0);
5017         }
5018 }
5019
5020 /* This is called by dlm_recoverd to process messages that were saved on
5021    the requestqueue. */
5022
5023 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
5024                                uint32_t saved_seq)
5025 {
5026         _receive_message(ls, ms, saved_seq);
5027 }
5028
5029 /* This is called by the midcomms layer when something is received for
5030    the lockspace.  It could be either a MSG (normal message sent as part of
5031    standard locking activity) or an RCOM (recovery message sent as part of
5032    lockspace recovery). */
5033
5034 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
5035 {
5036         struct dlm_header *hd = &p->header;
5037         struct dlm_ls *ls;
5038         int type = 0;
5039
5040         switch (hd->h_cmd) {
5041         case DLM_MSG:
5042                 dlm_message_in(&p->message);
5043                 type = p->message.m_type;
5044                 break;
5045         case DLM_RCOM:
5046                 dlm_rcom_in(&p->rcom);
5047                 type = p->rcom.rc_type;
5048                 break;
5049         default:
5050                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
5051                 return;
5052         }
5053
5054         if (hd->h_nodeid != nodeid) {
5055                 log_print("invalid h_nodeid %d from %d lockspace %x",
5056                           hd->h_nodeid, nodeid, hd->h_lockspace);
5057                 return;
5058         }
5059
5060         ls = dlm_find_lockspace_global(hd->h_lockspace);
5061         if (!ls) {
5062                 if (dlm_config.ci_log_debug) {
5063                         printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
5064                                 "%u from %d cmd %d type %d\n",
5065                                 hd->h_lockspace, nodeid, hd->h_cmd, type);
5066                 }
5067
5068                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
5069                         dlm_send_ls_not_ready(nodeid, &p->rcom);
5070                 return;
5071         }
5072
5073         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
5074            be inactive (in this ls) before transitioning to recovery mode */
5075
5076         down_read(&ls->ls_recv_active);
5077         if (hd->h_cmd == DLM_MSG)
5078                 dlm_receive_message(ls, &p->message, nodeid);
5079         else
5080                 dlm_receive_rcom(ls, &p->rcom, nodeid);
5081         up_read(&ls->ls_recv_active);
5082
5083         dlm_put_lockspace(ls);
5084 }
5085
5086 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5087                                    struct dlm_message *ms_stub)
5088 {
5089         if (middle_conversion(lkb)) {
5090                 hold_lkb(lkb);
5091                 memset(ms_stub, 0, sizeof(struct dlm_message));
5092                 ms_stub->m_flags = DLM_IFL_STUB_MS;
5093                 ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
5094                 ms_stub->m_result = -EINPROGRESS;
5095                 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5096                 _receive_convert_reply(lkb, ms_stub);
5097
5098                 /* Same special case as in receive_rcom_lock_args() */
5099                 lkb->lkb_grmode = DLM_LOCK_IV;
5100                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
5101                 unhold_lkb(lkb);
5102
5103         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5104                 lkb->lkb_flags |= DLM_IFL_RESEND;
5105         }
5106
5107         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5108            conversions are async; there's no reply from the remote master */
5109 }
5110
5111 /* A waiting lkb needs recovery if the master node has failed, or
5112    the master node is changing (only when no directory is used) */
5113
5114 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5115                                  int dir_nodeid)
5116 {
5117         if (dlm_no_directory(ls))
5118                 return 1;
5119
5120         if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5121                 return 1;
5122
5123         return 0;
5124 }
5125
5126 /* Recovery for locks that are waiting for replies from nodes that are now
5127    gone.  We can just complete unlocks and cancels by faking a reply from the
5128    dead node.  Requests and up-conversions we flag to be resent after
5129    recovery.  Down-conversions can just be completed with a fake reply like
5130    unlocks.  Conversions between PR and CW need special attention. */
5131
5132 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5133 {
5134         struct dlm_lkb *lkb, *safe;
5135         struct dlm_message *ms_stub;
5136         int wait_type, stub_unlock_result, stub_cancel_result;
5137         int dir_nodeid;
5138
5139         ms_stub = kmalloc(sizeof(*ms_stub), GFP_KERNEL);
5140         if (!ms_stub)
5141                 return;
5142
5143         mutex_lock(&ls->ls_waiters_mutex);
5144
5145         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5146
5147                 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5148
5149                 /* exclude debug messages about unlocks because there can be so
5150                    many and they aren't very interesting */
5151
5152                 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5153                         log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5154                                   "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5155                                   lkb->lkb_id,
5156                                   lkb->lkb_remid,
5157                                   lkb->lkb_wait_type,
5158                                   lkb->lkb_resource->res_nodeid,
5159                                   lkb->lkb_nodeid,
5160                                   lkb->lkb_wait_nodeid,
5161                                   dir_nodeid);
5162                 }
5163
5164                 /* all outstanding lookups, regardless of destination  will be
5165                    resent after recovery is done */
5166
5167                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5168                         lkb->lkb_flags |= DLM_IFL_RESEND;
5169                         continue;
5170                 }
5171
5172                 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5173                         continue;
5174
5175                 wait_type = lkb->lkb_wait_type;
5176                 stub_unlock_result = -DLM_EUNLOCK;
5177                 stub_cancel_result = -DLM_ECANCEL;
5178
5179                 /* Main reply may have been received leaving a zero wait_type,
5180                    but a reply for the overlapping op may not have been
5181                    received.  In that case we need to fake the appropriate
5182                    reply for the overlap op. */
5183
5184                 if (!wait_type) {
5185                         if (is_overlap_cancel(lkb)) {
5186                                 wait_type = DLM_MSG_CANCEL;
5187                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
5188                                         stub_cancel_result = 0;
5189                         }
5190                         if (is_overlap_unlock(lkb)) {
5191                                 wait_type = DLM_MSG_UNLOCK;
5192                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
5193                                         stub_unlock_result = -ENOENT;
5194                         }
5195
5196                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
5197                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
5198                                   stub_cancel_result, stub_unlock_result);
5199                 }
5200
5201                 switch (wait_type) {
5202
5203                 case DLM_MSG_REQUEST:
5204                         lkb->lkb_flags |= DLM_IFL_RESEND;
5205                         break;
5206
5207                 case DLM_MSG_CONVERT:
5208                         recover_convert_waiter(ls, lkb, ms_stub);
5209                         break;
5210
5211                 case DLM_MSG_UNLOCK:
5212                         hold_lkb(lkb);
5213                         memset(ms_stub, 0, sizeof(struct dlm_message));
5214                         ms_stub->m_flags = DLM_IFL_STUB_MS;
5215                         ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
5216                         ms_stub->m_result = stub_unlock_result;
5217                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5218                         _receive_unlock_reply(lkb, ms_stub);
5219                         dlm_put_lkb(lkb);
5220                         break;
5221
5222                 case DLM_MSG_CANCEL:
5223                         hold_lkb(lkb);
5224                         memset(ms_stub, 0, sizeof(struct dlm_message));
5225                         ms_stub->m_flags = DLM_IFL_STUB_MS;
5226                         ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
5227                         ms_stub->m_result = stub_cancel_result;
5228                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5229                         _receive_cancel_reply(lkb, ms_stub);
5230                         dlm_put_lkb(lkb);
5231                         break;
5232
5233                 default:
5234                         log_error(ls, "invalid lkb wait_type %d %d",
5235                                   lkb->lkb_wait_type, wait_type);
5236                 }
5237                 schedule();
5238         }
5239         mutex_unlock(&ls->ls_waiters_mutex);
5240         kfree(ms_stub);
5241 }
5242
5243 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5244 {
5245         struct dlm_lkb *lkb = NULL, *iter;
5246
5247         mutex_lock(&ls->ls_waiters_mutex);
5248         list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5249                 if (iter->lkb_flags & DLM_IFL_RESEND) {
5250                         hold_lkb(iter);
5251                         lkb = iter;
5252                         break;
5253                 }
5254         }
5255         mutex_unlock(&ls->ls_waiters_mutex);
5256
5257         return lkb;
5258 }
5259
5260 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
5261    master or dir-node for r.  Processing the lkb may result in it being placed
5262    back on waiters. */
5263
5264 /* We do this after normal locking has been enabled and any saved messages
5265    (in requestqueue) have been processed.  We should be confident that at
5266    this point we won't get or process a reply to any of these waiting
5267    operations.  But, new ops may be coming in on the rsbs/locks here from
5268    userspace or remotely. */
5269
5270 /* there may have been an overlap unlock/cancel prior to recovery or after
5271    recovery.  if before, the lkb may still have a pos wait_count; if after, the
5272    overlap flag would just have been set and nothing new sent.  we can be
5273    confident here than any replies to either the initial op or overlap ops
5274    prior to recovery have been received. */
5275
5276 int dlm_recover_waiters_post(struct dlm_ls *ls)
5277 {
5278         struct dlm_lkb *lkb;
5279         struct dlm_rsb *r;
5280         int error = 0, mstype, err, oc, ou;
5281
5282         while (1) {
5283                 if (dlm_locking_stopped(ls)) {
5284                         log_debug(ls, "recover_waiters_post aborted");
5285                         error = -EINTR;
5286                         break;
5287                 }
5288
5289                 lkb = find_resend_waiter(ls);
5290                 if (!lkb)
5291                         break;
5292
5293                 r = lkb->lkb_resource;
5294                 hold_rsb(r);
5295                 lock_rsb(r);
5296
5297                 mstype = lkb->lkb_wait_type;
5298                 oc = is_overlap_cancel(lkb);
5299                 ou = is_overlap_unlock(lkb);
5300                 err = 0;
5301
5302                 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5303                           "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5304                           "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5305                           r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5306                           dlm_dir_nodeid(r), oc, ou);
5307
5308                 /* At this point we assume that we won't get a reply to any
5309                    previous op or overlap op on this lock.  First, do a big
5310                    remove_from_waiters() for all previous ops. */
5311
5312                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
5313                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
5314                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
5315                 lkb->lkb_wait_type = 0;
5316                 /* drop all wait_count references we still
5317                  * hold a reference for this iteration.
5318                  */
5319                 while (lkb->lkb_wait_count) {
5320                         lkb->lkb_wait_count--;
5321                         unhold_lkb(lkb);
5322                 }
5323                 mutex_lock(&ls->ls_waiters_mutex);
5324                 list_del_init(&lkb->lkb_wait_reply);
5325                 mutex_unlock(&ls->ls_waiters_mutex);
5326
5327                 if (oc || ou) {
5328                         /* do an unlock or cancel instead of resending */
5329                         switch (mstype) {
5330                         case DLM_MSG_LOOKUP:
5331                         case DLM_MSG_REQUEST:
5332                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5333                                                         -DLM_ECANCEL);
5334                                 unhold_lkb(lkb); /* undoes create_lkb() */
5335                                 break;
5336                         case DLM_MSG_CONVERT:
5337                                 if (oc) {
5338                                         queue_cast(r, lkb, -DLM_ECANCEL);
5339                                 } else {
5340                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5341                                         _unlock_lock(r, lkb);
5342                                 }
5343                                 break;
5344                         default:
5345                                 err = 1;
5346                         }
5347                 } else {
5348                         switch (mstype) {
5349                         case DLM_MSG_LOOKUP:
5350                         case DLM_MSG_REQUEST:
5351                                 _request_lock(r, lkb);
5352                                 if (is_master(r))
5353                                         confirm_master(r, 0);
5354                                 break;
5355                         case DLM_MSG_CONVERT:
5356                                 _convert_lock(r, lkb);
5357                                 break;
5358                         default:
5359                                 err = 1;
5360                         }
5361                 }
5362
5363                 if (err) {
5364                         log_error(ls, "waiter %x msg %d r_nodeid %d "
5365                                   "dir_nodeid %d overlap %d %d",
5366                                   lkb->lkb_id, mstype, r->res_nodeid,
5367                                   dlm_dir_nodeid(r), oc, ou);
5368                 }
5369                 unlock_rsb(r);
5370                 put_rsb(r);
5371                 dlm_put_lkb(lkb);
5372         }
5373
5374         return error;
5375 }
5376
5377 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5378                               struct list_head *list)
5379 {
5380         struct dlm_lkb *lkb, *safe;
5381
5382         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5383                 if (!is_master_copy(lkb))
5384                         continue;
5385
5386                 /* don't purge lkbs we've added in recover_master_copy for
5387                    the current recovery seq */
5388
5389                 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5390                         continue;
5391
5392                 del_lkb(r, lkb);
5393
5394                 /* this put should free the lkb */
5395                 if (!dlm_put_lkb(lkb))
5396                         log_error(ls, "purged mstcpy lkb not released");
5397         }
5398 }
5399
5400 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5401 {
5402         struct dlm_ls *ls = r->res_ls;
5403
5404         purge_mstcpy_list(ls, r, &r->res_grantqueue);
5405         purge_mstcpy_list(ls, r, &r->res_convertqueue);
5406         purge_mstcpy_list(ls, r, &r->res_waitqueue);
5407 }
5408
5409 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5410                             struct list_head *list,
5411                             int nodeid_gone, unsigned int *count)
5412 {
5413         struct dlm_lkb *lkb, *safe;
5414
5415         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5416                 if (!is_master_copy(lkb))
5417                         continue;
5418
5419                 if ((lkb->lkb_nodeid == nodeid_gone) ||
5420                     dlm_is_removed(ls, lkb->lkb_nodeid)) {
5421
5422                         /* tell recover_lvb to invalidate the lvb
5423                            because a node holding EX/PW failed */
5424                         if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5425                             (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5426                                 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5427                         }
5428
5429                         del_lkb(r, lkb);
5430
5431                         /* this put should free the lkb */
5432                         if (!dlm_put_lkb(lkb))
5433                                 log_error(ls, "purged dead lkb not released");
5434
5435                         rsb_set_flag(r, RSB_RECOVER_GRANT);
5436
5437                         (*count)++;
5438                 }
5439         }
5440 }
5441
5442 /* Get rid of locks held by nodes that are gone. */
5443
5444 void dlm_recover_purge(struct dlm_ls *ls)
5445 {
5446         struct dlm_rsb *r;
5447         struct dlm_member *memb;
5448         int nodes_count = 0;
5449         int nodeid_gone = 0;
5450         unsigned int lkb_count = 0;
5451
5452         /* cache one removed nodeid to optimize the common
5453            case of a single node removed */
5454
5455         list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5456                 nodes_count++;
5457                 nodeid_gone = memb->nodeid;
5458         }
5459
5460         if (!nodes_count)
5461                 return;
5462
5463         down_write(&ls->ls_root_sem);
5464         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5465                 hold_rsb(r);
5466                 lock_rsb(r);
5467                 if (is_master(r)) {
5468                         purge_dead_list(ls, r, &r->res_grantqueue,
5469                                         nodeid_gone, &lkb_count);
5470                         purge_dead_list(ls, r, &r->res_convertqueue,
5471                                         nodeid_gone, &lkb_count);
5472                         purge_dead_list(ls, r, &r->res_waitqueue,
5473                                         nodeid_gone, &lkb_count);
5474                 }
5475                 unlock_rsb(r);
5476                 unhold_rsb(r);
5477                 cond_resched();
5478         }
5479         up_write(&ls->ls_root_sem);
5480
5481         if (lkb_count)
5482                 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5483                           lkb_count, nodes_count);
5484 }
5485
5486 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
5487 {
5488         struct rb_node *n;
5489         struct dlm_rsb *r;
5490
5491         spin_lock(&ls->ls_rsbtbl[bucket].lock);
5492         for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5493                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
5494
5495                 if (!rsb_flag(r, RSB_RECOVER_GRANT))
5496                         continue;
5497                 if (!is_master(r)) {
5498                         rsb_clear_flag(r, RSB_RECOVER_GRANT);
5499                         continue;
5500                 }
5501                 hold_rsb(r);
5502                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5503                 return r;
5504         }
5505         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5506         return NULL;
5507 }
5508
5509 /*
5510  * Attempt to grant locks on resources that we are the master of.
5511  * Locks may have become grantable during recovery because locks
5512  * from departed nodes have been purged (or not rebuilt), allowing
5513  * previously blocked locks to now be granted.  The subset of rsb's
5514  * we are interested in are those with lkb's on either the convert or
5515  * waiting queues.
5516  *
5517  * Simplest would be to go through each master rsb and check for non-empty
5518  * convert or waiting queues, and attempt to grant on those rsbs.
5519  * Checking the queues requires lock_rsb, though, for which we'd need
5520  * to release the rsbtbl lock.  This would make iterating through all
5521  * rsb's very inefficient.  So, we rely on earlier recovery routines
5522  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5523  * locks for.
5524  */
5525
5526 void dlm_recover_grant(struct dlm_ls *ls)
5527 {
5528         struct dlm_rsb *r;
5529         int bucket = 0;
5530         unsigned int count = 0;
5531         unsigned int rsb_count = 0;
5532         unsigned int lkb_count = 0;
5533
5534         while (1) {
5535                 r = find_grant_rsb(ls, bucket);
5536                 if (!r) {
5537                         if (bucket == ls->ls_rsbtbl_size - 1)
5538                                 break;
5539                         bucket++;
5540                         continue;
5541                 }
5542                 rsb_count++;
5543                 count = 0;
5544                 lock_rsb(r);
5545                 /* the RECOVER_GRANT flag is checked in the grant path */
5546                 grant_pending_locks(r, &count);
5547                 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5548                 lkb_count += count;
5549                 confirm_master(r, 0);
5550                 unlock_rsb(r);
5551                 put_rsb(r);
5552                 cond_resched();
5553         }
5554
5555         if (lkb_count)
5556                 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5557                           lkb_count, rsb_count);
5558 }
5559
5560 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5561                                          uint32_t remid)
5562 {
5563         struct dlm_lkb *lkb;
5564
5565         list_for_each_entry(lkb, head, lkb_statequeue) {
5566                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5567                         return lkb;
5568         }
5569         return NULL;
5570 }
5571
5572 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5573                                     uint32_t remid)
5574 {
5575         struct dlm_lkb *lkb;
5576
5577         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5578         if (lkb)
5579                 return lkb;
5580         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5581         if (lkb)
5582                 return lkb;
5583         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5584         if (lkb)
5585                 return lkb;
5586         return NULL;
5587 }
5588
5589 /* needs at least dlm_rcom + rcom_lock */
5590 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5591                                   struct dlm_rsb *r, struct dlm_rcom *rc)
5592 {
5593         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5594
5595         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
5596         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5597         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5598         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5599         lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
5600         lkb->lkb_flags |= DLM_IFL_MSTCPY;
5601         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5602         lkb->lkb_rqmode = rl->rl_rqmode;
5603         lkb->lkb_grmode = rl->rl_grmode;
5604         /* don't set lkb_status because add_lkb wants to itself */
5605
5606         lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5607         lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5608
5609         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5610                 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
5611                          sizeof(struct rcom_lock);
5612                 if (lvblen > ls->ls_lvblen)
5613                         return -EINVAL;
5614                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5615                 if (!lkb->lkb_lvbptr)
5616                         return -ENOMEM;
5617                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5618         }
5619
5620         /* Conversions between PR and CW (middle modes) need special handling.
5621            The real granted mode of these converting locks cannot be determined
5622            until all locks have been rebuilt on the rsb (recover_conversion) */
5623
5624         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5625             middle_conversion(lkb)) {
5626                 rl->rl_status = DLM_LKSTS_CONVERT;
5627                 lkb->lkb_grmode = DLM_LOCK_IV;
5628                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
5629         }
5630
5631         return 0;
5632 }
5633
5634 /* This lkb may have been recovered in a previous aborted recovery so we need
5635    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5636    If so we just send back a standard reply.  If not, we create a new lkb with
5637    the given values and send back our lkid.  We send back our lkid by sending
5638    back the rcom_lock struct we got but with the remid field filled in. */
5639
5640 /* needs at least dlm_rcom + rcom_lock */
5641 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5642 {
5643         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5644         struct dlm_rsb *r;
5645         struct dlm_lkb *lkb;
5646         uint32_t remid = 0;
5647         int from_nodeid = rc->rc_header.h_nodeid;
5648         int error;
5649
5650         if (rl->rl_parent_lkid) {
5651                 error = -EOPNOTSUPP;
5652                 goto out;
5653         }
5654
5655         remid = le32_to_cpu(rl->rl_lkid);
5656
5657         /* In general we expect the rsb returned to be R_MASTER, but we don't
5658            have to require it.  Recovery of masters on one node can overlap
5659            recovery of locks on another node, so one node can send us MSTCPY
5660            locks before we've made ourselves master of this rsb.  We can still
5661            add new MSTCPY locks that we receive here without any harm; when
5662            we make ourselves master, dlm_recover_masters() won't touch the
5663            MSTCPY locks we've received early. */
5664
5665         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5666                          from_nodeid, R_RECEIVE_RECOVER, &r);
5667         if (error)
5668                 goto out;
5669
5670         lock_rsb(r);
5671
5672         if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5673                 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5674                           from_nodeid, remid);
5675                 error = -EBADR;
5676                 goto out_unlock;
5677         }
5678
5679         lkb = search_remid(r, from_nodeid, remid);
5680         if (lkb) {
5681                 error = -EEXIST;
5682                 goto out_remid;
5683         }
5684
5685         error = create_lkb(ls, &lkb);
5686         if (error)
5687                 goto out_unlock;
5688
5689         error = receive_rcom_lock_args(ls, lkb, r, rc);
5690         if (error) {
5691                 __put_lkb(ls, lkb);
5692                 goto out_unlock;
5693         }
5694
5695         attach_lkb(r, lkb);
5696         add_lkb(r, lkb, rl->rl_status);
5697         error = 0;
5698         ls->ls_recover_locks_in++;
5699
5700         if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5701                 rsb_set_flag(r, RSB_RECOVER_GRANT);
5702
5703  out_remid:
5704         /* this is the new value returned to the lock holder for
5705            saving in its process-copy lkb */
5706         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
5707
5708         lkb->lkb_recover_seq = ls->ls_recover_seq;
5709
5710  out_unlock:
5711         unlock_rsb(r);
5712         put_rsb(r);
5713  out:
5714         if (error && error != -EEXIST)
5715                 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5716                           from_nodeid, remid, error);
5717         rl->rl_result = cpu_to_le32(error);
5718         return error;
5719 }
5720
5721 /* needs at least dlm_rcom + rcom_lock */
5722 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5723 {
5724         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5725         struct dlm_rsb *r;
5726         struct dlm_lkb *lkb;
5727         uint32_t lkid, remid;
5728         int error, result;
5729
5730         lkid = le32_to_cpu(rl->rl_lkid);
5731         remid = le32_to_cpu(rl->rl_remid);
5732         result = le32_to_cpu(rl->rl_result);
5733
5734         error = find_lkb(ls, lkid, &lkb);
5735         if (error) {
5736                 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5737                           lkid, rc->rc_header.h_nodeid, remid, result);
5738                 return error;
5739         }
5740
5741         r = lkb->lkb_resource;
5742         hold_rsb(r);
5743         lock_rsb(r);
5744
5745         if (!is_process_copy(lkb)) {
5746                 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5747                           lkid, rc->rc_header.h_nodeid, remid, result);
5748                 dlm_dump_rsb(r);
5749                 unlock_rsb(r);
5750                 put_rsb(r);
5751                 dlm_put_lkb(lkb);
5752                 return -EINVAL;
5753         }
5754
5755         switch (result) {
5756         case -EBADR:
5757                 /* There's a chance the new master received our lock before
5758                    dlm_recover_master_reply(), this wouldn't happen if we did
5759                    a barrier between recover_masters and recover_locks. */
5760
5761                 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5762                           lkid, rc->rc_header.h_nodeid, remid, result);
5763         
5764                 dlm_send_rcom_lock(r, lkb);
5765                 goto out;
5766         case -EEXIST:
5767         case 0:
5768                 lkb->lkb_remid = remid;
5769                 break;
5770         default:
5771                 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5772                           lkid, rc->rc_header.h_nodeid, remid, result);
5773         }
5774
5775         /* an ack for dlm_recover_locks() which waits for replies from
5776            all the locks it sends to new masters */
5777         dlm_recovered_lock(r);
5778  out:
5779         unlock_rsb(r);
5780         put_rsb(r);
5781         dlm_put_lkb(lkb);
5782
5783         return 0;
5784 }
5785
5786 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5787                      int mode, uint32_t flags, void *name, unsigned int namelen,
5788                      unsigned long timeout_cs)
5789 {
5790         struct dlm_lkb *lkb;
5791         struct dlm_args args;
5792         int error;
5793
5794         dlm_lock_recovery(ls);
5795
5796         error = create_lkb(ls, &lkb);
5797         if (error) {
5798                 kfree(ua);
5799                 goto out;
5800         }
5801
5802         if (flags & DLM_LKF_VALBLK) {
5803                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5804                 if (!ua->lksb.sb_lvbptr) {
5805                         kfree(ua);
5806                         __put_lkb(ls, lkb);
5807                         error = -ENOMEM;
5808                         goto out;
5809                 }
5810         }
5811         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
5812                               fake_astfn, ua, fake_bastfn, &args);
5813         if (error) {
5814                 kfree(ua->lksb.sb_lvbptr);
5815                 ua->lksb.sb_lvbptr = NULL;
5816                 kfree(ua);
5817                 __put_lkb(ls, lkb);
5818                 goto out;
5819         }
5820
5821         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5822            When DLM_IFL_USER is set, the dlm knows that this is a userspace
5823            lock and that lkb_astparam is the dlm_user_args structure. */
5824         lkb->lkb_flags |= DLM_IFL_USER;
5825         error = request_lock(ls, lkb, name, namelen, &args);
5826
5827         switch (error) {
5828         case 0:
5829                 break;
5830         case -EINPROGRESS:
5831                 error = 0;
5832                 break;
5833         case -EAGAIN:
5834                 error = 0;
5835                 /* fall through */
5836         default:
5837                 __put_lkb(ls, lkb);
5838                 goto out;
5839         }
5840
5841         /* add this new lkb to the per-process list of locks */
5842         spin_lock(&ua->proc->locks_spin);
5843         hold_lkb(lkb);
5844         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5845         spin_unlock(&ua->proc->locks_spin);
5846  out:
5847         dlm_unlock_recovery(ls);
5848         return error;
5849 }
5850
5851 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5852                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
5853                      unsigned long timeout_cs)
5854 {
5855         struct dlm_lkb *lkb;
5856         struct dlm_args args;
5857         struct dlm_user_args *ua;
5858         int error;
5859
5860         dlm_lock_recovery(ls);
5861
5862         error = find_lkb(ls, lkid, &lkb);
5863         if (error)
5864                 goto out;
5865
5866         /* user can change the params on its lock when it converts it, or
5867            add an lvb that didn't exist before */
5868
5869         ua = lkb->lkb_ua;
5870
5871         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5872                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5873                 if (!ua->lksb.sb_lvbptr) {
5874                         error = -ENOMEM;
5875                         goto out_put;
5876                 }
5877         }
5878         if (lvb_in && ua->lksb.sb_lvbptr)
5879                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5880
5881         ua->xid = ua_tmp->xid;
5882         ua->castparam = ua_tmp->castparam;
5883         ua->castaddr = ua_tmp->castaddr;
5884         ua->bastparam = ua_tmp->bastparam;
5885         ua->bastaddr = ua_tmp->bastaddr;
5886         ua->user_lksb = ua_tmp->user_lksb;
5887
5888         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
5889                               fake_astfn, ua, fake_bastfn, &args);
5890         if (error)
5891                 goto out_put;
5892
5893         error = convert_lock(ls, lkb, &args);
5894
5895         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5896                 error = 0;
5897  out_put:
5898         dlm_put_lkb(lkb);
5899  out:
5900         dlm_unlock_recovery(ls);
5901         kfree(ua_tmp);
5902         return error;
5903 }
5904
5905 /*
5906  * The caller asks for an orphan lock on a given resource with a given mode.
5907  * If a matching lock exists, it's moved to the owner's list of locks and
5908  * the lkid is returned.
5909  */
5910
5911 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5912                      int mode, uint32_t flags, void *name, unsigned int namelen,
5913                      unsigned long timeout_cs, uint32_t *lkid)
5914 {
5915         struct dlm_lkb *lkb = NULL, *iter;
5916         struct dlm_user_args *ua;
5917         int found_other_mode = 0;
5918         int rv = 0;
5919
5920         mutex_lock(&ls->ls_orphans_mutex);
5921         list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5922                 if (iter->lkb_resource->res_length != namelen)
5923                         continue;
5924                 if (memcmp(iter->lkb_resource->res_name, name, namelen))
5925                         continue;
5926                 if (iter->lkb_grmode != mode) {
5927                         found_other_mode = 1;
5928                         continue;
5929                 }
5930
5931                 lkb = iter;
5932                 list_del_init(&iter->lkb_ownqueue);
5933                 iter->lkb_flags &= ~DLM_IFL_ORPHAN;
5934                 *lkid = iter->lkb_id;
5935                 break;
5936         }
5937         mutex_unlock(&ls->ls_orphans_mutex);
5938
5939         if (!lkb && found_other_mode) {
5940                 rv = -EAGAIN;
5941                 goto out;
5942         }
5943
5944         if (!lkb) {
5945                 rv = -ENOENT;
5946                 goto out;
5947         }
5948
5949         lkb->lkb_exflags = flags;
5950         lkb->lkb_ownpid = (int) current->pid;
5951
5952         ua = lkb->lkb_ua;
5953
5954         ua->proc = ua_tmp->proc;
5955         ua->xid = ua_tmp->xid;
5956         ua->castparam = ua_tmp->castparam;
5957         ua->castaddr = ua_tmp->castaddr;
5958         ua->bastparam = ua_tmp->bastparam;
5959         ua->bastaddr = ua_tmp->bastaddr;
5960         ua->user_lksb = ua_tmp->user_lksb;
5961
5962         /*
5963          * The lkb reference from the ls_orphans list was not
5964          * removed above, and is now considered the reference
5965          * for the proc locks list.
5966          */
5967
5968         spin_lock(&ua->proc->locks_spin);
5969         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5970         spin_unlock(&ua->proc->locks_spin);
5971  out:
5972         kfree(ua_tmp);
5973         return rv;
5974 }
5975
5976 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5977                     uint32_t flags, uint32_t lkid, char *lvb_in)
5978 {
5979         struct dlm_lkb *lkb;
5980         struct dlm_args args;
5981         struct dlm_user_args *ua;
5982         int error;
5983
5984         dlm_lock_recovery(ls);
5985
5986         error = find_lkb(ls, lkid, &lkb);
5987         if (error)
5988                 goto out;
5989
5990         ua = lkb->lkb_ua;
5991
5992         if (lvb_in && ua->lksb.sb_lvbptr)
5993                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5994         if (ua_tmp->castparam)
5995                 ua->castparam = ua_tmp->castparam;
5996         ua->user_lksb = ua_tmp->user_lksb;
5997
5998         error = set_unlock_args(flags, ua, &args);
5999         if (error)
6000                 goto out_put;
6001
6002         error = unlock_lock(ls, lkb, &args);
6003
6004         if (error == -DLM_EUNLOCK)
6005                 error = 0;
6006         /* from validate_unlock_args() */
6007         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
6008                 error = 0;
6009         if (error)
6010                 goto out_put;
6011
6012         spin_lock(&ua->proc->locks_spin);
6013         /* dlm_user_add_cb() may have already taken lkb off the proc list */
6014         if (!list_empty(&lkb->lkb_ownqueue))
6015                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
6016         spin_unlock(&ua->proc->locks_spin);
6017  out_put:
6018         dlm_put_lkb(lkb);
6019  out:
6020         dlm_unlock_recovery(ls);
6021         kfree(ua_tmp);
6022         return error;
6023 }
6024
6025 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
6026                     uint32_t flags, uint32_t lkid)
6027 {
6028         struct dlm_lkb *lkb;
6029         struct dlm_args args;
6030         struct dlm_user_args *ua;
6031         int error;
6032
6033         dlm_lock_recovery(ls);
6034
6035         error = find_lkb(ls, lkid, &lkb);
6036         if (error)
6037                 goto out;
6038
6039         ua = lkb->lkb_ua;
6040         if (ua_tmp->castparam)
6041                 ua->castparam = ua_tmp->castparam;
6042         ua->user_lksb = ua_tmp->user_lksb;
6043
6044         error = set_unlock_args(flags, ua, &args);
6045         if (error)
6046                 goto out_put;
6047
6048         error = cancel_lock(ls, lkb, &args);
6049
6050         if (error == -DLM_ECANCEL)
6051                 error = 0;
6052         /* from validate_unlock_args() */
6053         if (error == -EBUSY)
6054                 error = 0;
6055  out_put:
6056         dlm_put_lkb(lkb);
6057  out:
6058         dlm_unlock_recovery(ls);
6059         kfree(ua_tmp);
6060         return error;
6061 }
6062
6063 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6064 {
6065         struct dlm_lkb *lkb;
6066         struct dlm_args args;
6067         struct dlm_user_args *ua;
6068         struct dlm_rsb *r;
6069         int error;
6070
6071         dlm_lock_recovery(ls);
6072
6073         error = find_lkb(ls, lkid, &lkb);
6074         if (error)
6075                 goto out;
6076
6077         ua = lkb->lkb_ua;
6078
6079         error = set_unlock_args(flags, ua, &args);
6080         if (error)
6081                 goto out_put;
6082
6083         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6084
6085         r = lkb->lkb_resource;
6086         hold_rsb(r);
6087         lock_rsb(r);
6088
6089         error = validate_unlock_args(lkb, &args);
6090         if (error)
6091                 goto out_r;
6092         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
6093
6094         error = _cancel_lock(r, lkb);
6095  out_r:
6096         unlock_rsb(r);
6097         put_rsb(r);
6098
6099         if (error == -DLM_ECANCEL)
6100                 error = 0;
6101         /* from validate_unlock_args() */
6102         if (error == -EBUSY)
6103                 error = 0;
6104  out_put:
6105         dlm_put_lkb(lkb);
6106  out:
6107         dlm_unlock_recovery(ls);
6108         return error;
6109 }
6110
6111 /* lkb's that are removed from the waiters list by revert are just left on the
6112    orphans list with the granted orphan locks, to be freed by purge */
6113
6114 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6115 {
6116         struct dlm_args args;
6117         int error;
6118
6119         hold_lkb(lkb); /* reference for the ls_orphans list */
6120         mutex_lock(&ls->ls_orphans_mutex);
6121         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6122         mutex_unlock(&ls->ls_orphans_mutex);
6123
6124         set_unlock_args(0, lkb->lkb_ua, &args);
6125
6126         error = cancel_lock(ls, lkb, &args);
6127         if (error == -DLM_ECANCEL)
6128                 error = 0;
6129         return error;
6130 }
6131
6132 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6133    granted.  Regardless of what rsb queue the lock is on, it's removed and
6134    freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
6135    if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6136
6137 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6138 {
6139         struct dlm_args args;
6140         int error;
6141
6142         set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6143                         lkb->lkb_ua, &args);
6144
6145         error = unlock_lock(ls, lkb, &args);
6146         if (error == -DLM_EUNLOCK)
6147                 error = 0;
6148         return error;
6149 }
6150
6151 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6152    (which does lock_rsb) due to deadlock with receiving a message that does
6153    lock_rsb followed by dlm_user_add_cb() */
6154
6155 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6156                                      struct dlm_user_proc *proc)
6157 {
6158         struct dlm_lkb *lkb = NULL;
6159
6160         mutex_lock(&ls->ls_clear_proc_locks);
6161         if (list_empty(&proc->locks))
6162                 goto out;
6163
6164         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6165         list_del_init(&lkb->lkb_ownqueue);
6166
6167         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6168                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
6169         else
6170                 lkb->lkb_flags |= DLM_IFL_DEAD;
6171  out:
6172         mutex_unlock(&ls->ls_clear_proc_locks);
6173         return lkb;
6174 }
6175
6176 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6177    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6178    which we clear here. */
6179
6180 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6181    list, and no more device_writes should add lkb's to proc->locks list; so we
6182    shouldn't need to take asts_spin or locks_spin here.  this assumes that
6183    device reads/writes/closes are serialized -- FIXME: we may need to serialize
6184    them ourself. */
6185
6186 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6187 {
6188         struct dlm_lkb *lkb, *safe;
6189
6190         dlm_lock_recovery(ls);
6191
6192         while (1) {
6193                 lkb = del_proc_lock(ls, proc);
6194                 if (!lkb)
6195                         break;
6196                 del_timeout(lkb);
6197                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6198                         orphan_proc_lock(ls, lkb);
6199                 else
6200                         unlock_proc_lock(ls, lkb);
6201
6202                 /* this removes the reference for the proc->locks list
6203                    added by dlm_user_request, it may result in the lkb
6204                    being freed */
6205
6206                 dlm_put_lkb(lkb);
6207         }
6208
6209         mutex_lock(&ls->ls_clear_proc_locks);
6210
6211         /* in-progress unlocks */
6212         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6213                 list_del_init(&lkb->lkb_ownqueue);
6214                 lkb->lkb_flags |= DLM_IFL_DEAD;
6215                 dlm_put_lkb(lkb);
6216         }
6217
6218         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6219                 memset(&lkb->lkb_callbacks, 0,
6220                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6221                 list_del_init(&lkb->lkb_cb_list);
6222                 dlm_put_lkb(lkb);
6223         }
6224
6225         mutex_unlock(&ls->ls_clear_proc_locks);
6226         dlm_unlock_recovery(ls);
6227 }
6228
6229 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6230 {
6231         struct dlm_lkb *lkb, *safe;
6232
6233         while (1) {
6234                 lkb = NULL;
6235                 spin_lock(&proc->locks_spin);
6236                 if (!list_empty(&proc->locks)) {
6237                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
6238                                          lkb_ownqueue);
6239                         list_del_init(&lkb->lkb_ownqueue);
6240                 }
6241                 spin_unlock(&proc->locks_spin);
6242
6243                 if (!lkb)
6244                         break;
6245
6246                 lkb->lkb_flags |= DLM_IFL_DEAD;
6247                 unlock_proc_lock(ls, lkb);
6248                 dlm_put_lkb(lkb); /* ref from proc->locks list */
6249         }
6250
6251         spin_lock(&proc->locks_spin);
6252         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6253                 list_del_init(&lkb->lkb_ownqueue);
6254                 lkb->lkb_flags |= DLM_IFL_DEAD;
6255                 dlm_put_lkb(lkb);
6256         }
6257         spin_unlock(&proc->locks_spin);
6258
6259         spin_lock(&proc->asts_spin);
6260         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6261                 memset(&lkb->lkb_callbacks, 0,
6262                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6263                 list_del_init(&lkb->lkb_cb_list);
6264                 dlm_put_lkb(lkb);
6265         }
6266         spin_unlock(&proc->asts_spin);
6267 }
6268
6269 /* pid of 0 means purge all orphans */
6270
6271 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6272 {
6273         struct dlm_lkb *lkb, *safe;
6274
6275         mutex_lock(&ls->ls_orphans_mutex);
6276         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6277                 if (pid && lkb->lkb_ownpid != pid)
6278                         continue;
6279                 unlock_proc_lock(ls, lkb);
6280                 list_del_init(&lkb->lkb_ownqueue);
6281                 dlm_put_lkb(lkb);
6282         }
6283         mutex_unlock(&ls->ls_orphans_mutex);
6284 }
6285
6286 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6287 {
6288         struct dlm_message *ms;
6289         struct dlm_mhandle *mh;
6290         int error;
6291
6292         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6293                                 DLM_MSG_PURGE, &ms, &mh);
6294         if (error)
6295                 return error;
6296         ms->m_nodeid = nodeid;
6297         ms->m_pid = pid;
6298
6299         return send_message(mh, ms);
6300 }
6301
6302 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6303                    int nodeid, int pid)
6304 {
6305         int error = 0;
6306
6307         if (nodeid && (nodeid != dlm_our_nodeid())) {
6308                 error = send_purge(ls, nodeid, pid);
6309         } else {
6310                 dlm_lock_recovery(ls);
6311                 if (pid == current->pid)
6312                         purge_proc_locks(ls, proc);
6313                 else
6314                         do_purge(ls, nodeid, pid);
6315                 dlm_unlock_recovery(ls);
6316         }
6317         return error;
6318 }
6319