GNU Linux-libre 6.9.1-gnu
[releases.git] / fs / dlm / lock.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
6 **
7 **
8 *******************************************************************************
9 ******************************************************************************/
10
11 /* Central locking logic has four stages:
12
13    dlm_lock()
14    dlm_unlock()
15
16    request_lock(ls, lkb)
17    convert_lock(ls, lkb)
18    unlock_lock(ls, lkb)
19    cancel_lock(ls, lkb)
20
21    _request_lock(r, lkb)
22    _convert_lock(r, lkb)
23    _unlock_lock(r, lkb)
24    _cancel_lock(r, lkb)
25
26    do_request(r, lkb)
27    do_convert(r, lkb)
28    do_unlock(r, lkb)
29    do_cancel(r, lkb)
30
31    Stage 1 (lock, unlock) is mainly about checking input args and
32    splitting into one of the four main operations:
33
34        dlm_lock          = request_lock
35        dlm_lock+CONVERT  = convert_lock
36        dlm_unlock        = unlock_lock
37        dlm_unlock+CANCEL = cancel_lock
38
39    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40    provided to the next stage.
41
42    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43    When remote, it calls send_xxxx(), when local it calls do_xxxx().
44
45    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
46    given rsb and lkb and queues callbacks.
47
48    For remote operations, send_xxxx() results in the corresponding do_xxxx()
49    function being executed on the remote node.  The connecting send/receive
50    calls on local (L) and remote (R) nodes:
51
52    L: send_xxxx()              ->  R: receive_xxxx()
53                                    R: do_xxxx()
54    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
55 */
56 #include <trace/events/dlm.h>
57
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "midcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89                                     const struct dlm_message *ms, bool local);
90 static int receive_extralen(const struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void toss_rsb(struct kref *kref);
93
94 /*
95  * Lock compatibilty matrix - thanks Steve
96  * UN = Unlocked state. Not really a state, used as a flag
97  * PD = Padding. Used to make the matrix a nice power of two in size
98  * Other states are the same as the VMS DLM.
99  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
100  */
101
102 static const int __dlm_compat_matrix[8][8] = {
103       /* UN NL CR CW PR PW EX PD */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
106         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
107         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
108         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
109         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
110         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
111         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
112 };
113
114 /*
115  * This defines the direction of transfer of LVB data.
116  * Granted mode is the row; requested mode is the column.
117  * Usage: matrix[grmode+1][rqmode+1]
118  * 1 = LVB is returned to the caller
119  * 0 = LVB is written to the resource
120  * -1 = nothing happens to the LVB
121  */
122
123 const int dlm_lvb_operations[8][8] = {
124         /* UN   NL  CR  CW  PR  PW  EX  PD*/
125         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
126         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
127         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
128         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
129         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
130         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
133 };
134
135 #define modes_compat(gr, rq) \
136         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137
138 int dlm_modes_compat(int mode1, int mode2)
139 {
140         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141 }
142
143 /*
144  * Compatibility matrix for conversions with QUECVT set.
145  * Granted mode is the row; requested mode is the column.
146  * Usage: matrix[grmode+1][rqmode+1]
147  */
148
149 static const int __quecvt_compat_matrix[8][8] = {
150       /* UN NL CR CW PR PW EX PD */
151         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
152         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
153         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
154         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
155         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
156         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
157         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
158         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
159 };
160
161 void dlm_print_lkb(struct dlm_lkb *lkb)
162 {
163         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164                "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166                dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
167                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168                (unsigned long long)lkb->lkb_recover_seq);
169 }
170
171 static void dlm_print_rsb(struct dlm_rsb *r)
172 {
173         printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
174                "rlc %d name %s\n",
175                r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
176                r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
177                r->res_name);
178 }
179
180 void dlm_dump_rsb(struct dlm_rsb *r)
181 {
182         struct dlm_lkb *lkb;
183
184         dlm_print_rsb(r);
185
186         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
187                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
188         printk(KERN_ERR "rsb lookup list\n");
189         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
190                 dlm_print_lkb(lkb);
191         printk(KERN_ERR "rsb grant queue:\n");
192         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
193                 dlm_print_lkb(lkb);
194         printk(KERN_ERR "rsb convert queue:\n");
195         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
196                 dlm_print_lkb(lkb);
197         printk(KERN_ERR "rsb wait queue:\n");
198         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
199                 dlm_print_lkb(lkb);
200 }
201
202 /* Threads cannot use the lockspace while it's being recovered */
203
204 static inline void dlm_lock_recovery(struct dlm_ls *ls)
205 {
206         down_read(&ls->ls_in_recovery);
207 }
208
209 void dlm_unlock_recovery(struct dlm_ls *ls)
210 {
211         up_read(&ls->ls_in_recovery);
212 }
213
214 int dlm_lock_recovery_try(struct dlm_ls *ls)
215 {
216         return down_read_trylock(&ls->ls_in_recovery);
217 }
218
219 static inline int can_be_queued(struct dlm_lkb *lkb)
220 {
221         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
222 }
223
224 static inline int force_blocking_asts(struct dlm_lkb *lkb)
225 {
226         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
227 }
228
229 static inline int is_demoted(struct dlm_lkb *lkb)
230 {
231         return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
232 }
233
234 static inline int is_altmode(struct dlm_lkb *lkb)
235 {
236         return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
237 }
238
239 static inline int is_granted(struct dlm_lkb *lkb)
240 {
241         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
242 }
243
244 static inline int is_remote(struct dlm_rsb *r)
245 {
246         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
247         return !!r->res_nodeid;
248 }
249
250 static inline int is_process_copy(struct dlm_lkb *lkb)
251 {
252         return lkb->lkb_nodeid &&
253                !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
254 }
255
256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258         return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
259 }
260
261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265                 return 1;
266         return 0;
267 }
268
269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276         return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
277 }
278
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281         return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
282 }
283
284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286         return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
287                test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
288 }
289
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292         if (is_master_copy(lkb))
293                 return;
294
295         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296
297         if (rv == -DLM_ECANCEL &&
298             test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
299                 rv = -EDEADLK;
300
301         dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
302 }
303
304 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
305 {
306         queue_cast(r, lkb,
307                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
308 }
309
310 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
311 {
312         if (is_master_copy(lkb)) {
313                 send_bast(r, lkb, rqmode);
314         } else {
315                 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
316         }
317 }
318
319 /*
320  * Basic operations on rsb's and lkb's
321  */
322
323 /* This is only called to add a reference when the code already holds
324    a valid reference to the rsb, so there's no need for locking. */
325
326 static inline void hold_rsb(struct dlm_rsb *r)
327 {
328         kref_get(&r->res_ref);
329 }
330
331 void dlm_hold_rsb(struct dlm_rsb *r)
332 {
333         hold_rsb(r);
334 }
335
336 /* When all references to the rsb are gone it's transferred to
337    the tossed list for later disposal. */
338
339 static void put_rsb(struct dlm_rsb *r)
340 {
341         struct dlm_ls *ls = r->res_ls;
342         uint32_t bucket = r->res_bucket;
343         int rv;
344
345         rv = kref_put_lock(&r->res_ref, toss_rsb,
346                            &ls->ls_rsbtbl[bucket].lock);
347         if (rv)
348                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
349 }
350
351 void dlm_put_rsb(struct dlm_rsb *r)
352 {
353         put_rsb(r);
354 }
355
356 static int pre_rsb_struct(struct dlm_ls *ls)
357 {
358         struct dlm_rsb *r1, *r2;
359         int count = 0;
360
361         spin_lock(&ls->ls_new_rsb_spin);
362         if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
363                 spin_unlock(&ls->ls_new_rsb_spin);
364                 return 0;
365         }
366         spin_unlock(&ls->ls_new_rsb_spin);
367
368         r1 = dlm_allocate_rsb(ls);
369         r2 = dlm_allocate_rsb(ls);
370
371         spin_lock(&ls->ls_new_rsb_spin);
372         if (r1) {
373                 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
374                 ls->ls_new_rsb_count++;
375         }
376         if (r2) {
377                 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
378                 ls->ls_new_rsb_count++;
379         }
380         count = ls->ls_new_rsb_count;
381         spin_unlock(&ls->ls_new_rsb_spin);
382
383         if (!count)
384                 return -ENOMEM;
385         return 0;
386 }
387
388 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
389    unlock any spinlocks, go back and call pre_rsb_struct again.
390    Otherwise, take an rsb off the list and return it. */
391
392 static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
393                           struct dlm_rsb **r_ret)
394 {
395         struct dlm_rsb *r;
396         int count;
397
398         spin_lock(&ls->ls_new_rsb_spin);
399         if (list_empty(&ls->ls_new_rsb)) {
400                 count = ls->ls_new_rsb_count;
401                 spin_unlock(&ls->ls_new_rsb_spin);
402                 log_debug(ls, "find_rsb retry %d %d %s",
403                           count, dlm_config.ci_new_rsb_count,
404                           (const char *)name);
405                 return -EAGAIN;
406         }
407
408         r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
409         list_del(&r->res_hashchain);
410         /* Convert the empty list_head to a NULL rb_node for tree usage: */
411         memset(&r->res_hashnode, 0, sizeof(struct rb_node));
412         ls->ls_new_rsb_count--;
413         spin_unlock(&ls->ls_new_rsb_spin);
414
415         r->res_ls = ls;
416         r->res_length = len;
417         memcpy(r->res_name, name, len);
418         mutex_init(&r->res_mutex);
419
420         INIT_LIST_HEAD(&r->res_lookup);
421         INIT_LIST_HEAD(&r->res_grantqueue);
422         INIT_LIST_HEAD(&r->res_convertqueue);
423         INIT_LIST_HEAD(&r->res_waitqueue);
424         INIT_LIST_HEAD(&r->res_root_list);
425         INIT_LIST_HEAD(&r->res_recover_list);
426
427         *r_ret = r;
428         return 0;
429 }
430
431 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
432 {
433         char maxname[DLM_RESNAME_MAXLEN];
434
435         memset(maxname, 0, DLM_RESNAME_MAXLEN);
436         memcpy(maxname, name, nlen);
437         return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
438 }
439
440 int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len,
441                         struct dlm_rsb **r_ret)
442 {
443         struct rb_node *node = tree->rb_node;
444         struct dlm_rsb *r;
445         int rc;
446
447         while (node) {
448                 r = rb_entry(node, struct dlm_rsb, res_hashnode);
449                 rc = rsb_cmp(r, name, len);
450                 if (rc < 0)
451                         node = node->rb_left;
452                 else if (rc > 0)
453                         node = node->rb_right;
454                 else
455                         goto found;
456         }
457         *r_ret = NULL;
458         return -EBADR;
459
460  found:
461         *r_ret = r;
462         return 0;
463 }
464
465 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
466 {
467         struct rb_node **newn = &tree->rb_node;
468         struct rb_node *parent = NULL;
469         int rc;
470
471         while (*newn) {
472                 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
473                                                res_hashnode);
474
475                 parent = *newn;
476                 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
477                 if (rc < 0)
478                         newn = &parent->rb_left;
479                 else if (rc > 0)
480                         newn = &parent->rb_right;
481                 else {
482                         log_print("rsb_insert match");
483                         dlm_dump_rsb(rsb);
484                         dlm_dump_rsb(cur);
485                         return -EEXIST;
486                 }
487         }
488
489         rb_link_node(&rsb->res_hashnode, parent, newn);
490         rb_insert_color(&rsb->res_hashnode, tree);
491         return 0;
492 }
493
494 /*
495  * Find rsb in rsbtbl and potentially create/add one
496  *
497  * Delaying the release of rsb's has a similar benefit to applications keeping
498  * NL locks on an rsb, but without the guarantee that the cached master value
499  * will still be valid when the rsb is reused.  Apps aren't always smart enough
500  * to keep NL locks on an rsb that they may lock again shortly; this can lead
501  * to excessive master lookups and removals if we don't delay the release.
502  *
503  * Searching for an rsb means looking through both the normal list and toss
504  * list.  When found on the toss list the rsb is moved to the normal list with
505  * ref count of 1; when found on normal list the ref count is incremented.
506  *
507  * rsb's on the keep list are being used locally and refcounted.
508  * rsb's on the toss list are not being used locally, and are not refcounted.
509  *
510  * The toss list rsb's were either
511  * - previously used locally but not any more (were on keep list, then
512  *   moved to toss list when last refcount dropped)
513  * - created and put on toss list as a directory record for a lookup
514  *   (we are the dir node for the res, but are not using the res right now,
515  *   but some other node is)
516  *
517  * The purpose of find_rsb() is to return a refcounted rsb for local use.
518  * So, if the given rsb is on the toss list, it is moved to the keep list
519  * before being returned.
520  *
521  * toss_rsb() happens when all local usage of the rsb is done, i.e. no
522  * more refcounts exist, so the rsb is moved from the keep list to the
523  * toss list.
524  *
525  * rsb's on both keep and toss lists are used for doing a name to master
526  * lookups.  rsb's that are in use locally (and being refcounted) are on
527  * the keep list, rsb's that are not in use locally (not refcounted) and
528  * only exist for name/master lookups are on the toss list.
529  *
530  * rsb's on the toss list who's dir_nodeid is not local can have stale
531  * name/master mappings.  So, remote requests on such rsb's can potentially
532  * return with an error, which means the mapping is stale and needs to
533  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
534  * first_lkid is to keep only a single outstanding request on an rsb
535  * while that rsb has a potentially stale master.)
536  */
537
538 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
539                         uint32_t hash, uint32_t b,
540                         int dir_nodeid, int from_nodeid,
541                         unsigned int flags, struct dlm_rsb **r_ret)
542 {
543         struct dlm_rsb *r = NULL;
544         int our_nodeid = dlm_our_nodeid();
545         int from_local = 0;
546         int from_other = 0;
547         int from_dir = 0;
548         int create = 0;
549         int error;
550
551         if (flags & R_RECEIVE_REQUEST) {
552                 if (from_nodeid == dir_nodeid)
553                         from_dir = 1;
554                 else
555                         from_other = 1;
556         } else if (flags & R_REQUEST) {
557                 from_local = 1;
558         }
559
560         /*
561          * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
562          * from_nodeid has sent us a lock in dlm_recover_locks, believing
563          * we're the new master.  Our local recovery may not have set
564          * res_master_nodeid to our_nodeid yet, so allow either.  Don't
565          * create the rsb; dlm_recover_process_copy() will handle EBADR
566          * by resending.
567          *
568          * If someone sends us a request, we are the dir node, and we do
569          * not find the rsb anywhere, then recreate it.  This happens if
570          * someone sends us a request after we have removed/freed an rsb
571          * from our toss list.  (They sent a request instead of lookup
572          * because they are using an rsb from their toss list.)
573          */
574
575         if (from_local || from_dir ||
576             (from_other && (dir_nodeid == our_nodeid))) {
577                 create = 1;
578         }
579
580  retry:
581         if (create) {
582                 error = pre_rsb_struct(ls);
583                 if (error < 0)
584                         goto out;
585         }
586
587         spin_lock(&ls->ls_rsbtbl[b].lock);
588
589         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
590         if (error)
591                 goto do_toss;
592         
593         /*
594          * rsb is active, so we can't check master_nodeid without lock_rsb.
595          */
596
597         kref_get(&r->res_ref);
598         goto out_unlock;
599
600
601  do_toss:
602         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
603         if (error)
604                 goto do_new;
605
606         /*
607          * rsb found inactive (master_nodeid may be out of date unless
608          * we are the dir_nodeid or were the master)  No other thread
609          * is using this rsb because it's on the toss list, so we can
610          * look at or update res_master_nodeid without lock_rsb.
611          */
612
613         if ((r->res_master_nodeid != our_nodeid) && from_other) {
614                 /* our rsb was not master, and another node (not the dir node)
615                    has sent us a request */
616                 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
617                           from_nodeid, r->res_master_nodeid, dir_nodeid,
618                           r->res_name);
619                 error = -ENOTBLK;
620                 goto out_unlock;
621         }
622
623         if ((r->res_master_nodeid != our_nodeid) && from_dir) {
624                 /* don't think this should ever happen */
625                 log_error(ls, "find_rsb toss from_dir %d master %d",
626                           from_nodeid, r->res_master_nodeid);
627                 dlm_print_rsb(r);
628                 /* fix it and go on */
629                 r->res_master_nodeid = our_nodeid;
630                 r->res_nodeid = 0;
631                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
632                 r->res_first_lkid = 0;
633         }
634
635         if (from_local && (r->res_master_nodeid != our_nodeid)) {
636                 /* Because we have held no locks on this rsb,
637                    res_master_nodeid could have become stale. */
638                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
639                 r->res_first_lkid = 0;
640         }
641
642         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
643         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
644         goto out_unlock;
645
646
647  do_new:
648         /*
649          * rsb not found
650          */
651
652         if (error == -EBADR && !create)
653                 goto out_unlock;
654
655         error = get_rsb_struct(ls, name, len, &r);
656         if (error == -EAGAIN) {
657                 spin_unlock(&ls->ls_rsbtbl[b].lock);
658                 goto retry;
659         }
660         if (error)
661                 goto out_unlock;
662
663         r->res_hash = hash;
664         r->res_bucket = b;
665         r->res_dir_nodeid = dir_nodeid;
666         kref_init(&r->res_ref);
667
668         if (from_dir) {
669                 /* want to see how often this happens */
670                 log_debug(ls, "find_rsb new from_dir %d recreate %s",
671                           from_nodeid, r->res_name);
672                 r->res_master_nodeid = our_nodeid;
673                 r->res_nodeid = 0;
674                 goto out_add;
675         }
676
677         if (from_other && (dir_nodeid != our_nodeid)) {
678                 /* should never happen */
679                 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
680                           from_nodeid, dir_nodeid, our_nodeid, r->res_name);
681                 dlm_free_rsb(r);
682                 r = NULL;
683                 error = -ENOTBLK;
684                 goto out_unlock;
685         }
686
687         if (from_other) {
688                 log_debug(ls, "find_rsb new from_other %d dir %d %s",
689                           from_nodeid, dir_nodeid, r->res_name);
690         }
691
692         if (dir_nodeid == our_nodeid) {
693                 /* When we are the dir nodeid, we can set the master
694                    node immediately */
695                 r->res_master_nodeid = our_nodeid;
696                 r->res_nodeid = 0;
697         } else {
698                 /* set_master will send_lookup to dir_nodeid */
699                 r->res_master_nodeid = 0;
700                 r->res_nodeid = -1;
701         }
702
703  out_add:
704         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
705  out_unlock:
706         spin_unlock(&ls->ls_rsbtbl[b].lock);
707  out:
708         *r_ret = r;
709         return error;
710 }
711
712 /* During recovery, other nodes can send us new MSTCPY locks (from
713    dlm_recover_locks) before we've made ourself master (in
714    dlm_recover_masters). */
715
716 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
717                           uint32_t hash, uint32_t b,
718                           int dir_nodeid, int from_nodeid,
719                           unsigned int flags, struct dlm_rsb **r_ret)
720 {
721         struct dlm_rsb *r = NULL;
722         int our_nodeid = dlm_our_nodeid();
723         int recover = (flags & R_RECEIVE_RECOVER);
724         int error;
725
726  retry:
727         error = pre_rsb_struct(ls);
728         if (error < 0)
729                 goto out;
730
731         spin_lock(&ls->ls_rsbtbl[b].lock);
732
733         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
734         if (error)
735                 goto do_toss;
736
737         /*
738          * rsb is active, so we can't check master_nodeid without lock_rsb.
739          */
740
741         kref_get(&r->res_ref);
742         goto out_unlock;
743
744
745  do_toss:
746         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
747         if (error)
748                 goto do_new;
749
750         /*
751          * rsb found inactive. No other thread is using this rsb because
752          * it's on the toss list, so we can look at or update
753          * res_master_nodeid without lock_rsb.
754          */
755
756         if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
757                 /* our rsb is not master, and another node has sent us a
758                    request; this should never happen */
759                 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
760                           from_nodeid, r->res_master_nodeid, dir_nodeid);
761                 dlm_print_rsb(r);
762                 error = -ENOTBLK;
763                 goto out_unlock;
764         }
765
766         if (!recover && (r->res_master_nodeid != our_nodeid) &&
767             (dir_nodeid == our_nodeid)) {
768                 /* our rsb is not master, and we are dir; may as well fix it;
769                    this should never happen */
770                 log_error(ls, "find_rsb toss our %d master %d dir %d",
771                           our_nodeid, r->res_master_nodeid, dir_nodeid);
772                 dlm_print_rsb(r);
773                 r->res_master_nodeid = our_nodeid;
774                 r->res_nodeid = 0;
775         }
776
777         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
778         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
779         goto out_unlock;
780
781
782  do_new:
783         /*
784          * rsb not found
785          */
786
787         error = get_rsb_struct(ls, name, len, &r);
788         if (error == -EAGAIN) {
789                 spin_unlock(&ls->ls_rsbtbl[b].lock);
790                 goto retry;
791         }
792         if (error)
793                 goto out_unlock;
794
795         r->res_hash = hash;
796         r->res_bucket = b;
797         r->res_dir_nodeid = dir_nodeid;
798         r->res_master_nodeid = dir_nodeid;
799         r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
800         kref_init(&r->res_ref);
801
802         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
803  out_unlock:
804         spin_unlock(&ls->ls_rsbtbl[b].lock);
805  out:
806         *r_ret = r;
807         return error;
808 }
809
810 static int find_rsb(struct dlm_ls *ls, const void *name, int len,
811                     int from_nodeid, unsigned int flags,
812                     struct dlm_rsb **r_ret)
813 {
814         uint32_t hash, b;
815         int dir_nodeid;
816
817         if (len > DLM_RESNAME_MAXLEN)
818                 return -EINVAL;
819
820         hash = jhash(name, len, 0);
821         b = hash & (ls->ls_rsbtbl_size - 1);
822
823         dir_nodeid = dlm_hash2nodeid(ls, hash);
824
825         if (dlm_no_directory(ls))
826                 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
827                                       from_nodeid, flags, r_ret);
828         else
829                 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
830                                       from_nodeid, flags, r_ret);
831 }
832
833 /* we have received a request and found that res_master_nodeid != our_nodeid,
834    so we need to return an error or make ourself the master */
835
836 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
837                                   int from_nodeid)
838 {
839         if (dlm_no_directory(ls)) {
840                 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
841                           from_nodeid, r->res_master_nodeid,
842                           r->res_dir_nodeid);
843                 dlm_print_rsb(r);
844                 return -ENOTBLK;
845         }
846
847         if (from_nodeid != r->res_dir_nodeid) {
848                 /* our rsb is not master, and another node (not the dir node)
849                    has sent us a request.  this is much more common when our
850                    master_nodeid is zero, so limit debug to non-zero.  */
851
852                 if (r->res_master_nodeid) {
853                         log_debug(ls, "validate master from_other %d master %d "
854                                   "dir %d first %x %s", from_nodeid,
855                                   r->res_master_nodeid, r->res_dir_nodeid,
856                                   r->res_first_lkid, r->res_name);
857                 }
858                 return -ENOTBLK;
859         } else {
860                 /* our rsb is not master, but the dir nodeid has sent us a
861                    request; this could happen with master 0 / res_nodeid -1 */
862
863                 if (r->res_master_nodeid) {
864                         log_error(ls, "validate master from_dir %d master %d "
865                                   "first %x %s",
866                                   from_nodeid, r->res_master_nodeid,
867                                   r->res_first_lkid, r->res_name);
868                 }
869
870                 r->res_master_nodeid = dlm_our_nodeid();
871                 r->res_nodeid = 0;
872                 return 0;
873         }
874 }
875
876 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
877                                 int from_nodeid, bool toss_list, unsigned int flags,
878                                 int *r_nodeid, int *result)
879 {
880         int fix_master = (flags & DLM_LU_RECOVER_MASTER);
881         int from_master = (flags & DLM_LU_RECOVER_DIR);
882
883         if (r->res_dir_nodeid != our_nodeid) {
884                 /* should not happen, but may as well fix it and carry on */
885                 log_error(ls, "%s res_dir %d our %d %s", __func__,
886                           r->res_dir_nodeid, our_nodeid, r->res_name);
887                 r->res_dir_nodeid = our_nodeid;
888         }
889
890         if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
891                 /* Recovery uses this function to set a new master when
892                  * the previous master failed.  Setting NEW_MASTER will
893                  * force dlm_recover_masters to call recover_master on this
894                  * rsb even though the res_nodeid is no longer removed.
895                  */
896
897                 r->res_master_nodeid = from_nodeid;
898                 r->res_nodeid = from_nodeid;
899                 rsb_set_flag(r, RSB_NEW_MASTER);
900
901                 if (toss_list) {
902                         /* I don't think we should ever find it on toss list. */
903                         log_error(ls, "%s fix_master on toss", __func__);
904                         dlm_dump_rsb(r);
905                 }
906         }
907
908         if (from_master && (r->res_master_nodeid != from_nodeid)) {
909                 /* this will happen if from_nodeid became master during
910                  * a previous recovery cycle, and we aborted the previous
911                  * cycle before recovering this master value
912                  */
913
914                 log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
915                           __func__, from_nodeid, r->res_master_nodeid,
916                           r->res_nodeid, r->res_first_lkid, r->res_name);
917
918                 if (r->res_master_nodeid == our_nodeid) {
919                         log_error(ls, "from_master %d our_master", from_nodeid);
920                         dlm_dump_rsb(r);
921                         goto ret_assign;
922                 }
923
924                 r->res_master_nodeid = from_nodeid;
925                 r->res_nodeid = from_nodeid;
926                 rsb_set_flag(r, RSB_NEW_MASTER);
927         }
928
929         if (!r->res_master_nodeid) {
930                 /* this will happen if recovery happens while we're looking
931                  * up the master for this rsb
932                  */
933
934                 log_debug(ls, "%s master 0 to %d first %x %s", __func__,
935                           from_nodeid, r->res_first_lkid, r->res_name);
936                 r->res_master_nodeid = from_nodeid;
937                 r->res_nodeid = from_nodeid;
938         }
939
940         if (!from_master && !fix_master &&
941             (r->res_master_nodeid == from_nodeid)) {
942                 /* this can happen when the master sends remove, the dir node
943                  * finds the rsb on the keep list and ignores the remove,
944                  * and the former master sends a lookup
945                  */
946
947                 log_limit(ls, "%s from master %d flags %x first %x %s",
948                           __func__, from_nodeid, flags, r->res_first_lkid,
949                           r->res_name);
950         }
951
952  ret_assign:
953         *r_nodeid = r->res_master_nodeid;
954         if (result)
955                 *result = DLM_LU_MATCH;
956 }
957
958 /*
959  * We're the dir node for this res and another node wants to know the
960  * master nodeid.  During normal operation (non recovery) this is only
961  * called from receive_lookup(); master lookups when the local node is
962  * the dir node are done by find_rsb().
963  *
964  * normal operation, we are the dir node for a resource
965  * . _request_lock
966  * . set_master
967  * . send_lookup
968  * . receive_lookup
969  * . dlm_master_lookup flags 0
970  *
971  * recover directory, we are rebuilding dir for all resources
972  * . dlm_recover_directory
973  * . dlm_rcom_names
974  *   remote node sends back the rsb names it is master of and we are dir of
975  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
976  *   we either create new rsb setting remote node as master, or find existing
977  *   rsb and set master to be the remote node.
978  *
979  * recover masters, we are finding the new master for resources
980  * . dlm_recover_masters
981  * . recover_master
982  * . dlm_send_rcom_lookup
983  * . receive_rcom_lookup
984  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
985  */
986
987 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
988                       int len, unsigned int flags, int *r_nodeid, int *result)
989 {
990         struct dlm_rsb *r = NULL;
991         uint32_t hash, b;
992         int our_nodeid = dlm_our_nodeid();
993         int dir_nodeid, error;
994
995         if (len > DLM_RESNAME_MAXLEN)
996                 return -EINVAL;
997
998         if (from_nodeid == our_nodeid) {
999                 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1000                           our_nodeid, flags);
1001                 return -EINVAL;
1002         }
1003
1004         hash = jhash(name, len, 0);
1005         b = hash & (ls->ls_rsbtbl_size - 1);
1006
1007         dir_nodeid = dlm_hash2nodeid(ls, hash);
1008         if (dir_nodeid != our_nodeid) {
1009                 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1010                           from_nodeid, dir_nodeid, our_nodeid, hash,
1011                           ls->ls_num_nodes);
1012                 *r_nodeid = -1;
1013                 return -EINVAL;
1014         }
1015
1016  retry:
1017         error = pre_rsb_struct(ls);
1018         if (error < 0)
1019                 return error;
1020
1021         spin_lock(&ls->ls_rsbtbl[b].lock);
1022         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1023         if (!error) {
1024                 /* because the rsb is active, we need to lock_rsb before
1025                  * checking/changing re_master_nodeid
1026                  */
1027
1028                 hold_rsb(r);
1029                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1030                 lock_rsb(r);
1031
1032                 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1033                                     flags, r_nodeid, result);
1034
1035                 /* the rsb was active */
1036                 unlock_rsb(r);
1037                 put_rsb(r);
1038
1039                 return 0;
1040         }
1041
1042         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1043         if (error)
1044                 goto not_found;
1045
1046         /* because the rsb is inactive (on toss list), it's not refcounted
1047          * and lock_rsb is not used, but is protected by the rsbtbl lock
1048          */
1049
1050         __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1051                             r_nodeid, result);
1052
1053         r->res_toss_time = jiffies;
1054         /* the rsb was inactive (on toss list) */
1055         spin_unlock(&ls->ls_rsbtbl[b].lock);
1056
1057         return 0;
1058
1059  not_found:
1060         error = get_rsb_struct(ls, name, len, &r);
1061         if (error == -EAGAIN) {
1062                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1063                 goto retry;
1064         }
1065         if (error)
1066                 goto out_unlock;
1067
1068         r->res_hash = hash;
1069         r->res_bucket = b;
1070         r->res_dir_nodeid = our_nodeid;
1071         r->res_master_nodeid = from_nodeid;
1072         r->res_nodeid = from_nodeid;
1073         kref_init(&r->res_ref);
1074         r->res_toss_time = jiffies;
1075
1076         error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1077         if (error) {
1078                 /* should never happen */
1079                 dlm_free_rsb(r);
1080                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1081                 goto retry;
1082         }
1083
1084         if (result)
1085                 *result = DLM_LU_ADD;
1086         *r_nodeid = from_nodeid;
1087  out_unlock:
1088         spin_unlock(&ls->ls_rsbtbl[b].lock);
1089         return error;
1090 }
1091
1092 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1093 {
1094         struct rb_node *n;
1095         struct dlm_rsb *r;
1096         int i;
1097
1098         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1099                 spin_lock(&ls->ls_rsbtbl[i].lock);
1100                 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1101                         r = rb_entry(n, struct dlm_rsb, res_hashnode);
1102                         if (r->res_hash == hash)
1103                                 dlm_dump_rsb(r);
1104                 }
1105                 spin_unlock(&ls->ls_rsbtbl[i].lock);
1106         }
1107 }
1108
1109 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
1110 {
1111         struct dlm_rsb *r = NULL;
1112         uint32_t hash, b;
1113         int error;
1114
1115         hash = jhash(name, len, 0);
1116         b = hash & (ls->ls_rsbtbl_size - 1);
1117
1118         spin_lock(&ls->ls_rsbtbl[b].lock);
1119         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1120         if (!error)
1121                 goto out_dump;
1122
1123         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1124         if (error)
1125                 goto out;
1126  out_dump:
1127         dlm_dump_rsb(r);
1128  out:
1129         spin_unlock(&ls->ls_rsbtbl[b].lock);
1130 }
1131
1132 static void toss_rsb(struct kref *kref)
1133 {
1134         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1135         struct dlm_ls *ls = r->res_ls;
1136
1137         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1138         kref_init(&r->res_ref);
1139         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1140         rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1141         r->res_toss_time = jiffies;
1142         set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[r->res_bucket].flags);
1143         if (r->res_lvbptr) {
1144                 dlm_free_lvb(r->res_lvbptr);
1145                 r->res_lvbptr = NULL;
1146         }
1147 }
1148
1149 /* See comment for unhold_lkb */
1150
1151 static void unhold_rsb(struct dlm_rsb *r)
1152 {
1153         int rv;
1154         rv = kref_put(&r->res_ref, toss_rsb);
1155         DLM_ASSERT(!rv, dlm_dump_rsb(r););
1156 }
1157
1158 static void kill_rsb(struct kref *kref)
1159 {
1160         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1161
1162         /* All work is done after the return from kref_put() so we
1163            can release the write_lock before the remove and free. */
1164
1165         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1166         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1167         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1168         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1169         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1170         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1171 }
1172
1173 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1174    The rsb must exist as long as any lkb's for it do. */
1175
1176 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1177 {
1178         hold_rsb(r);
1179         lkb->lkb_resource = r;
1180 }
1181
1182 static void detach_lkb(struct dlm_lkb *lkb)
1183 {
1184         if (lkb->lkb_resource) {
1185                 put_rsb(lkb->lkb_resource);
1186                 lkb->lkb_resource = NULL;
1187         }
1188 }
1189
1190 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1191                        int start, int end)
1192 {
1193         struct dlm_lkb *lkb;
1194         int rv;
1195
1196         lkb = dlm_allocate_lkb(ls);
1197         if (!lkb)
1198                 return -ENOMEM;
1199
1200         lkb->lkb_last_bast_mode = -1;
1201         lkb->lkb_nodeid = -1;
1202         lkb->lkb_grmode = DLM_LOCK_IV;
1203         kref_init(&lkb->lkb_ref);
1204         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1205         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1206         INIT_LIST_HEAD(&lkb->lkb_cb_list);
1207         INIT_LIST_HEAD(&lkb->lkb_callbacks);
1208         spin_lock_init(&lkb->lkb_cb_lock);
1209         INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1210
1211         idr_preload(GFP_NOFS);
1212         spin_lock(&ls->ls_lkbidr_spin);
1213         rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
1214         if (rv >= 0)
1215                 lkb->lkb_id = rv;
1216         spin_unlock(&ls->ls_lkbidr_spin);
1217         idr_preload_end();
1218
1219         if (rv < 0) {
1220                 log_error(ls, "create_lkb idr error %d", rv);
1221                 dlm_free_lkb(lkb);
1222                 return rv;
1223         }
1224
1225         *lkb_ret = lkb;
1226         return 0;
1227 }
1228
1229 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1230 {
1231         return _create_lkb(ls, lkb_ret, 1, 0);
1232 }
1233
1234 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1235 {
1236         struct dlm_lkb *lkb;
1237
1238         spin_lock(&ls->ls_lkbidr_spin);
1239         lkb = idr_find(&ls->ls_lkbidr, lkid);
1240         if (lkb)
1241                 kref_get(&lkb->lkb_ref);
1242         spin_unlock(&ls->ls_lkbidr_spin);
1243
1244         *lkb_ret = lkb;
1245         return lkb ? 0 : -ENOENT;
1246 }
1247
1248 static void kill_lkb(struct kref *kref)
1249 {
1250         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1251
1252         /* All work is done after the return from kref_put() so we
1253            can release the write_lock before the detach_lkb */
1254
1255         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1256 }
1257
1258 /* __put_lkb() is used when an lkb may not have an rsb attached to
1259    it so we need to provide the lockspace explicitly */
1260
1261 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1262 {
1263         uint32_t lkid = lkb->lkb_id;
1264         int rv;
1265
1266         rv = kref_put_lock(&lkb->lkb_ref, kill_lkb,
1267                            &ls->ls_lkbidr_spin);
1268         if (rv) {
1269                 idr_remove(&ls->ls_lkbidr, lkid);
1270                 spin_unlock(&ls->ls_lkbidr_spin);
1271
1272                 detach_lkb(lkb);
1273
1274                 /* for local/process lkbs, lvbptr points to caller's lksb */
1275                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1276                         dlm_free_lvb(lkb->lkb_lvbptr);
1277                 dlm_free_lkb(lkb);
1278         }
1279
1280         return rv;
1281 }
1282
1283 int dlm_put_lkb(struct dlm_lkb *lkb)
1284 {
1285         struct dlm_ls *ls;
1286
1287         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1288         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1289
1290         ls = lkb->lkb_resource->res_ls;
1291         return __put_lkb(ls, lkb);
1292 }
1293
1294 /* This is only called to add a reference when the code already holds
1295    a valid reference to the lkb, so there's no need for locking. */
1296
1297 static inline void hold_lkb(struct dlm_lkb *lkb)
1298 {
1299         kref_get(&lkb->lkb_ref);
1300 }
1301
1302 static void unhold_lkb_assert(struct kref *kref)
1303 {
1304         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1305
1306         DLM_ASSERT(false, dlm_print_lkb(lkb););
1307 }
1308
1309 /* This is called when we need to remove a reference and are certain
1310    it's not the last ref.  e.g. del_lkb is always called between a
1311    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1312    put_lkb would work fine, but would involve unnecessary locking */
1313
1314 static inline void unhold_lkb(struct dlm_lkb *lkb)
1315 {
1316         kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1317 }
1318
1319 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1320                             int mode)
1321 {
1322         struct dlm_lkb *lkb = NULL, *iter;
1323
1324         list_for_each_entry(iter, head, lkb_statequeue)
1325                 if (iter->lkb_rqmode < mode) {
1326                         lkb = iter;
1327                         list_add_tail(new, &iter->lkb_statequeue);
1328                         break;
1329                 }
1330
1331         if (!lkb)
1332                 list_add_tail(new, head);
1333 }
1334
1335 /* add/remove lkb to rsb's grant/convert/wait queue */
1336
1337 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1338 {
1339         kref_get(&lkb->lkb_ref);
1340
1341         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1342
1343         lkb->lkb_timestamp = ktime_get();
1344
1345         lkb->lkb_status = status;
1346
1347         switch (status) {
1348         case DLM_LKSTS_WAITING:
1349                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1350                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1351                 else
1352                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1353                 break;
1354         case DLM_LKSTS_GRANTED:
1355                 /* convention says granted locks kept in order of grmode */
1356                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1357                                 lkb->lkb_grmode);
1358                 break;
1359         case DLM_LKSTS_CONVERT:
1360                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1361                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1362                 else
1363                         list_add_tail(&lkb->lkb_statequeue,
1364                                       &r->res_convertqueue);
1365                 break;
1366         default:
1367                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1368         }
1369 }
1370
1371 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1372 {
1373         lkb->lkb_status = 0;
1374         list_del(&lkb->lkb_statequeue);
1375         unhold_lkb(lkb);
1376 }
1377
1378 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1379 {
1380         hold_lkb(lkb);
1381         del_lkb(r, lkb);
1382         add_lkb(r, lkb, sts);
1383         unhold_lkb(lkb);
1384 }
1385
1386 static int msg_reply_type(int mstype)
1387 {
1388         switch (mstype) {
1389         case DLM_MSG_REQUEST:
1390                 return DLM_MSG_REQUEST_REPLY;
1391         case DLM_MSG_CONVERT:
1392                 return DLM_MSG_CONVERT_REPLY;
1393         case DLM_MSG_UNLOCK:
1394                 return DLM_MSG_UNLOCK_REPLY;
1395         case DLM_MSG_CANCEL:
1396                 return DLM_MSG_CANCEL_REPLY;
1397         case DLM_MSG_LOOKUP:
1398                 return DLM_MSG_LOOKUP_REPLY;
1399         }
1400         return -1;
1401 }
1402
1403 /* add/remove lkb from global waiters list of lkb's waiting for
1404    a reply from a remote node */
1405
1406 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1407 {
1408         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1409         int error = 0;
1410
1411         mutex_lock(&ls->ls_waiters_mutex);
1412
1413         if (is_overlap_unlock(lkb) ||
1414             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1415                 error = -EINVAL;
1416                 goto out;
1417         }
1418
1419         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1420                 switch (mstype) {
1421                 case DLM_MSG_UNLOCK:
1422                         set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
1423                         break;
1424                 case DLM_MSG_CANCEL:
1425                         set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
1426                         break;
1427                 default:
1428                         error = -EBUSY;
1429                         goto out;
1430                 }
1431                 lkb->lkb_wait_count++;
1432                 hold_lkb(lkb);
1433
1434                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1435                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
1436                           lkb->lkb_wait_count, dlm_iflags_val(lkb));
1437                 goto out;
1438         }
1439
1440         DLM_ASSERT(!lkb->lkb_wait_count,
1441                    dlm_print_lkb(lkb);
1442                    printk("wait_count %d\n", lkb->lkb_wait_count););
1443
1444         lkb->lkb_wait_count++;
1445         lkb->lkb_wait_type = mstype;
1446         lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1447         hold_lkb(lkb);
1448         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1449  out:
1450         if (error)
1451                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
1452                           lkb->lkb_id, error, dlm_iflags_val(lkb), mstype,
1453                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1454         mutex_unlock(&ls->ls_waiters_mutex);
1455         return error;
1456 }
1457
1458 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1459    list as part of process_requestqueue (e.g. a lookup that has an optimized
1460    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1461    set RESEND and dlm_recover_waiters_post() */
1462
1463 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1464                                 const struct dlm_message *ms)
1465 {
1466         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1467         int overlap_done = 0;
1468
1469         if (mstype == DLM_MSG_UNLOCK_REPLY &&
1470             test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1471                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1472                 overlap_done = 1;
1473                 goto out_del;
1474         }
1475
1476         if (mstype == DLM_MSG_CANCEL_REPLY &&
1477             test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1478                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1479                 overlap_done = 1;
1480                 goto out_del;
1481         }
1482
1483         /* Cancel state was preemptively cleared by a successful convert,
1484            see next comment, nothing to do. */
1485
1486         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1487             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1488                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1489                           lkb->lkb_id, lkb->lkb_wait_type);
1490                 return -1;
1491         }
1492
1493         /* Remove for the convert reply, and premptively remove for the
1494            cancel reply.  A convert has been granted while there's still
1495            an outstanding cancel on it (the cancel is moot and the result
1496            in the cancel reply should be 0).  We preempt the cancel reply
1497            because the app gets the convert result and then can follow up
1498            with another op, like convert.  This subsequent op would see the
1499            lingering state of the cancel and fail with -EBUSY. */
1500
1501         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1502             (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
1503             test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1504                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1505                           lkb->lkb_id);
1506                 lkb->lkb_wait_type = 0;
1507                 lkb->lkb_wait_count--;
1508                 unhold_lkb(lkb);
1509                 goto out_del;
1510         }
1511
1512         /* N.B. type of reply may not always correspond to type of original
1513            msg due to lookup->request optimization, verify others? */
1514
1515         if (lkb->lkb_wait_type) {
1516                 lkb->lkb_wait_type = 0;
1517                 goto out_del;
1518         }
1519
1520         log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1521                   lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1522                   lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
1523         return -1;
1524
1525  out_del:
1526         /* the force-unlock/cancel has completed and we haven't recvd a reply
1527            to the op that was in progress prior to the unlock/cancel; we
1528            give up on any reply to the earlier op.  FIXME: not sure when/how
1529            this would happen */
1530
1531         if (overlap_done && lkb->lkb_wait_type) {
1532                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1533                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
1534                 lkb->lkb_wait_count--;
1535                 unhold_lkb(lkb);
1536                 lkb->lkb_wait_type = 0;
1537         }
1538
1539         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1540
1541         clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
1542         lkb->lkb_wait_count--;
1543         if (!lkb->lkb_wait_count)
1544                 list_del_init(&lkb->lkb_wait_reply);
1545         unhold_lkb(lkb);
1546         return 0;
1547 }
1548
1549 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1550 {
1551         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1552         int error;
1553
1554         mutex_lock(&ls->ls_waiters_mutex);
1555         error = _remove_from_waiters(lkb, mstype, NULL);
1556         mutex_unlock(&ls->ls_waiters_mutex);
1557         return error;
1558 }
1559
1560 /* Handles situations where we might be processing a "fake" or "local" reply in
1561    which we can't try to take waiters_mutex again. */
1562
1563 static int remove_from_waiters_ms(struct dlm_lkb *lkb,
1564                                   const struct dlm_message *ms, bool local)
1565 {
1566         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1567         int error;
1568
1569         if (!local)
1570                 mutex_lock(&ls->ls_waiters_mutex);
1571         error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1572         if (!local)
1573                 mutex_unlock(&ls->ls_waiters_mutex);
1574         return error;
1575 }
1576
1577 static void shrink_bucket(struct dlm_ls *ls, int b)
1578 {
1579         struct rb_node *n, *next;
1580         struct dlm_rsb *r;
1581         char *name;
1582         int our_nodeid = dlm_our_nodeid();
1583         int remote_count = 0;
1584         int need_shrink = 0;
1585         int i, len, rv;
1586
1587         memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1588
1589         spin_lock(&ls->ls_rsbtbl[b].lock);
1590
1591         if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags)) {
1592                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1593                 return;
1594         }
1595
1596         for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1597                 next = rb_next(n);
1598                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1599
1600                 /* If we're the directory record for this rsb, and
1601                    we're not the master of it, then we need to wait
1602                    for the master node to send us a dir remove for
1603                    before removing the dir record. */
1604
1605                 if (!dlm_no_directory(ls) &&
1606                     (r->res_master_nodeid != our_nodeid) &&
1607                     (dlm_dir_nodeid(r) == our_nodeid)) {
1608                         continue;
1609                 }
1610
1611                 need_shrink = 1;
1612
1613                 if (!time_after_eq(jiffies, r->res_toss_time +
1614                                    dlm_config.ci_toss_secs * HZ)) {
1615                         continue;
1616                 }
1617
1618                 if (!dlm_no_directory(ls) &&
1619                     (r->res_master_nodeid == our_nodeid) &&
1620                     (dlm_dir_nodeid(r) != our_nodeid)) {
1621
1622                         /* We're the master of this rsb but we're not
1623                            the directory record, so we need to tell the
1624                            dir node to remove the dir record. */
1625
1626                         ls->ls_remove_lens[remote_count] = r->res_length;
1627                         memcpy(ls->ls_remove_names[remote_count], r->res_name,
1628                                DLM_RESNAME_MAXLEN);
1629                         remote_count++;
1630
1631                         if (remote_count >= DLM_REMOVE_NAMES_MAX)
1632                                 break;
1633                         continue;
1634                 }
1635
1636                 if (!kref_put(&r->res_ref, kill_rsb)) {
1637                         log_error(ls, "tossed rsb in use %s", r->res_name);
1638                         continue;
1639                 }
1640
1641                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1642                 dlm_free_rsb(r);
1643         }
1644
1645         if (need_shrink)
1646                 set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
1647         else
1648                 clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
1649         spin_unlock(&ls->ls_rsbtbl[b].lock);
1650
1651         /*
1652          * While searching for rsb's to free, we found some that require
1653          * remote removal.  We leave them in place and find them again here
1654          * so there is a very small gap between removing them from the toss
1655          * list and sending the removal.  Keeping this gap small is
1656          * important to keep us (the master node) from being out of sync
1657          * with the remote dir node for very long.
1658          */
1659
1660         for (i = 0; i < remote_count; i++) {
1661                 name = ls->ls_remove_names[i];
1662                 len = ls->ls_remove_lens[i];
1663
1664                 spin_lock(&ls->ls_rsbtbl[b].lock);
1665                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1666                 if (rv) {
1667                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1668                         log_debug(ls, "remove_name not toss %s", name);
1669                         continue;
1670                 }
1671
1672                 if (r->res_master_nodeid != our_nodeid) {
1673                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1674                         log_debug(ls, "remove_name master %d dir %d our %d %s",
1675                                   r->res_master_nodeid, r->res_dir_nodeid,
1676                                   our_nodeid, name);
1677                         continue;
1678                 }
1679
1680                 if (r->res_dir_nodeid == our_nodeid) {
1681                         /* should never happen */
1682                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1683                         log_error(ls, "remove_name dir %d master %d our %d %s",
1684                                   r->res_dir_nodeid, r->res_master_nodeid,
1685                                   our_nodeid, name);
1686                         continue;
1687                 }
1688
1689                 if (!time_after_eq(jiffies, r->res_toss_time +
1690                                    dlm_config.ci_toss_secs * HZ)) {
1691                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1692                         log_debug(ls, "remove_name toss_time %lu now %lu %s",
1693                                   r->res_toss_time, jiffies, name);
1694                         continue;
1695                 }
1696
1697                 if (!kref_put(&r->res_ref, kill_rsb)) {
1698                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1699                         log_error(ls, "remove_name in use %s", name);
1700                         continue;
1701                 }
1702
1703                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1704                 send_remove(r);
1705                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1706
1707                 dlm_free_rsb(r);
1708         }
1709 }
1710
1711 void dlm_scan_rsbs(struct dlm_ls *ls)
1712 {
1713         int i;
1714
1715         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1716                 shrink_bucket(ls, i);
1717                 if (dlm_locking_stopped(ls))
1718                         break;
1719                 cond_resched();
1720         }
1721 }
1722
1723 /* lkb is master or local copy */
1724
1725 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1726 {
1727         int b, len = r->res_ls->ls_lvblen;
1728
1729         /* b=1 lvb returned to caller
1730            b=0 lvb written to rsb or invalidated
1731            b=-1 do nothing */
1732
1733         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1734
1735         if (b == 1) {
1736                 if (!lkb->lkb_lvbptr)
1737                         return;
1738
1739                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1740                         return;
1741
1742                 if (!r->res_lvbptr)
1743                         return;
1744
1745                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1746                 lkb->lkb_lvbseq = r->res_lvbseq;
1747
1748         } else if (b == 0) {
1749                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1750                         rsb_set_flag(r, RSB_VALNOTVALID);
1751                         return;
1752                 }
1753
1754                 if (!lkb->lkb_lvbptr)
1755                         return;
1756
1757                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1758                         return;
1759
1760                 if (!r->res_lvbptr)
1761                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1762
1763                 if (!r->res_lvbptr)
1764                         return;
1765
1766                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1767                 r->res_lvbseq++;
1768                 lkb->lkb_lvbseq = r->res_lvbseq;
1769                 rsb_clear_flag(r, RSB_VALNOTVALID);
1770         }
1771
1772         if (rsb_flag(r, RSB_VALNOTVALID))
1773                 set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
1774 }
1775
1776 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1777 {
1778         if (lkb->lkb_grmode < DLM_LOCK_PW)
1779                 return;
1780
1781         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1782                 rsb_set_flag(r, RSB_VALNOTVALID);
1783                 return;
1784         }
1785
1786         if (!lkb->lkb_lvbptr)
1787                 return;
1788
1789         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1790                 return;
1791
1792         if (!r->res_lvbptr)
1793                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1794
1795         if (!r->res_lvbptr)
1796                 return;
1797
1798         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1799         r->res_lvbseq++;
1800         rsb_clear_flag(r, RSB_VALNOTVALID);
1801 }
1802
1803 /* lkb is process copy (pc) */
1804
1805 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1806                             const struct dlm_message *ms)
1807 {
1808         int b;
1809
1810         if (!lkb->lkb_lvbptr)
1811                 return;
1812
1813         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1814                 return;
1815
1816         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1817         if (b == 1) {
1818                 int len = receive_extralen(ms);
1819                 if (len > r->res_ls->ls_lvblen)
1820                         len = r->res_ls->ls_lvblen;
1821                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1822                 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
1823         }
1824 }
1825
1826 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1827    remove_lock -- used for unlock, removes lkb from granted
1828    revert_lock -- used for cancel, moves lkb from convert to granted
1829    grant_lock  -- used for request and convert, adds lkb to granted or
1830                   moves lkb from convert or waiting to granted
1831
1832    Each of these is used for master or local copy lkb's.  There is
1833    also a _pc() variation used to make the corresponding change on
1834    a process copy (pc) lkb. */
1835
1836 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1837 {
1838         del_lkb(r, lkb);
1839         lkb->lkb_grmode = DLM_LOCK_IV;
1840         /* this unhold undoes the original ref from create_lkb()
1841            so this leads to the lkb being freed */
1842         unhold_lkb(lkb);
1843 }
1844
1845 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1846 {
1847         set_lvb_unlock(r, lkb);
1848         _remove_lock(r, lkb);
1849 }
1850
1851 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1852 {
1853         _remove_lock(r, lkb);
1854 }
1855
1856 /* returns: 0 did nothing
1857             1 moved lock to granted
1858            -1 removed lock */
1859
1860 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1861 {
1862         int rv = 0;
1863
1864         lkb->lkb_rqmode = DLM_LOCK_IV;
1865
1866         switch (lkb->lkb_status) {
1867         case DLM_LKSTS_GRANTED:
1868                 break;
1869         case DLM_LKSTS_CONVERT:
1870                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1871                 rv = 1;
1872                 break;
1873         case DLM_LKSTS_WAITING:
1874                 del_lkb(r, lkb);
1875                 lkb->lkb_grmode = DLM_LOCK_IV;
1876                 /* this unhold undoes the original ref from create_lkb()
1877                    so this leads to the lkb being freed */
1878                 unhold_lkb(lkb);
1879                 rv = -1;
1880                 break;
1881         default:
1882                 log_print("invalid status for revert %d", lkb->lkb_status);
1883         }
1884         return rv;
1885 }
1886
1887 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1888 {
1889         return revert_lock(r, lkb);
1890 }
1891
1892 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1893 {
1894         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1895                 lkb->lkb_grmode = lkb->lkb_rqmode;
1896                 if (lkb->lkb_status)
1897                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1898                 else
1899                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1900         }
1901
1902         lkb->lkb_rqmode = DLM_LOCK_IV;
1903         lkb->lkb_highbast = 0;
1904 }
1905
1906 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1907 {
1908         set_lvb_lock(r, lkb);
1909         _grant_lock(r, lkb);
1910 }
1911
1912 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1913                           const struct dlm_message *ms)
1914 {
1915         set_lvb_lock_pc(r, lkb, ms);
1916         _grant_lock(r, lkb);
1917 }
1918
1919 /* called by grant_pending_locks() which means an async grant message must
1920    be sent to the requesting node in addition to granting the lock if the
1921    lkb belongs to a remote node. */
1922
1923 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1924 {
1925         grant_lock(r, lkb);
1926         if (is_master_copy(lkb))
1927                 send_grant(r, lkb);
1928         else
1929                 queue_cast(r, lkb, 0);
1930 }
1931
1932 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1933    change the granted/requested modes.  We're munging things accordingly in
1934    the process copy.
1935    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1936    conversion deadlock
1937    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1938    compatible with other granted locks */
1939
1940 static void munge_demoted(struct dlm_lkb *lkb)
1941 {
1942         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1943                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1944                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1945                 return;
1946         }
1947
1948         lkb->lkb_grmode = DLM_LOCK_NL;
1949 }
1950
1951 static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
1952 {
1953         if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
1954             ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
1955                 log_print("munge_altmode %x invalid reply type %d",
1956                           lkb->lkb_id, le32_to_cpu(ms->m_type));
1957                 return;
1958         }
1959
1960         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1961                 lkb->lkb_rqmode = DLM_LOCK_PR;
1962         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1963                 lkb->lkb_rqmode = DLM_LOCK_CW;
1964         else {
1965                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1966                 dlm_print_lkb(lkb);
1967         }
1968 }
1969
1970 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1971 {
1972         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1973                                            lkb_statequeue);
1974         if (lkb->lkb_id == first->lkb_id)
1975                 return 1;
1976
1977         return 0;
1978 }
1979
1980 /* Check if the given lkb conflicts with another lkb on the queue. */
1981
1982 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1983 {
1984         struct dlm_lkb *this;
1985
1986         list_for_each_entry(this, head, lkb_statequeue) {
1987                 if (this == lkb)
1988                         continue;
1989                 if (!modes_compat(this, lkb))
1990                         return 1;
1991         }
1992         return 0;
1993 }
1994
1995 /*
1996  * "A conversion deadlock arises with a pair of lock requests in the converting
1997  * queue for one resource.  The granted mode of each lock blocks the requested
1998  * mode of the other lock."
1999  *
2000  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2001  * convert queue from being granted, then deadlk/demote lkb.
2002  *
2003  * Example:
2004  * Granted Queue: empty
2005  * Convert Queue: NL->EX (first lock)
2006  *                PR->EX (second lock)
2007  *
2008  * The first lock can't be granted because of the granted mode of the second
2009  * lock and the second lock can't be granted because it's not first in the
2010  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2011  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2012  * flag set and return DEMOTED in the lksb flags.
2013  *
2014  * Originally, this function detected conv-deadlk in a more limited scope:
2015  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2016  * - if lkb1 was the first entry in the queue (not just earlier), and was
2017  *   blocked by the granted mode of lkb2, and there was nothing on the
2018  *   granted queue preventing lkb1 from being granted immediately, i.e.
2019  *   lkb2 was the only thing preventing lkb1 from being granted.
2020  *
2021  * That second condition meant we'd only say there was conv-deadlk if
2022  * resolving it (by demotion) would lead to the first lock on the convert
2023  * queue being granted right away.  It allowed conversion deadlocks to exist
2024  * between locks on the convert queue while they couldn't be granted anyway.
2025  *
2026  * Now, we detect and take action on conversion deadlocks immediately when
2027  * they're created, even if they may not be immediately consequential.  If
2028  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2029  * mode that would prevent lkb1's conversion from being granted, we do a
2030  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2031  * I think this means that the lkb_is_ahead condition below should always
2032  * be zero, i.e. there will never be conv-deadlk between two locks that are
2033  * both already on the convert queue.
2034  */
2035
2036 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2037 {
2038         struct dlm_lkb *lkb1;
2039         int lkb_is_ahead = 0;
2040
2041         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2042                 if (lkb1 == lkb2) {
2043                         lkb_is_ahead = 1;
2044                         continue;
2045                 }
2046
2047                 if (!lkb_is_ahead) {
2048                         if (!modes_compat(lkb2, lkb1))
2049                                 return 1;
2050                 } else {
2051                         if (!modes_compat(lkb2, lkb1) &&
2052                             !modes_compat(lkb1, lkb2))
2053                                 return 1;
2054                 }
2055         }
2056         return 0;
2057 }
2058
2059 /*
2060  * Return 1 if the lock can be granted, 0 otherwise.
2061  * Also detect and resolve conversion deadlocks.
2062  *
2063  * lkb is the lock to be granted
2064  *
2065  * now is 1 if the function is being called in the context of the
2066  * immediate request, it is 0 if called later, after the lock has been
2067  * queued.
2068  *
2069  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2070  * after recovery.
2071  *
2072  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2073  */
2074
2075 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2076                            int recover)
2077 {
2078         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2079
2080         /*
2081          * 6-10: Version 5.4 introduced an option to address the phenomenon of
2082          * a new request for a NL mode lock being blocked.
2083          *
2084          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2085          * request, then it would be granted.  In essence, the use of this flag
2086          * tells the Lock Manager to expedite theis request by not considering
2087          * what may be in the CONVERTING or WAITING queues...  As of this
2088          * writing, the EXPEDITE flag can be used only with new requests for NL
2089          * mode locks.  This flag is not valid for conversion requests.
2090          *
2091          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2092          * conversion or used with a non-NL requested mode.  We also know an
2093          * EXPEDITE request is always granted immediately, so now must always
2094          * be 1.  The full condition to grant an expedite request: (now &&
2095          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2096          * therefore be shortened to just checking the flag.
2097          */
2098
2099         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2100                 return 1;
2101
2102         /*
2103          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2104          * added to the remaining conditions.
2105          */
2106
2107         if (queue_conflict(&r->res_grantqueue, lkb))
2108                 return 0;
2109
2110         /*
2111          * 6-3: By default, a conversion request is immediately granted if the
2112          * requested mode is compatible with the modes of all other granted
2113          * locks
2114          */
2115
2116         if (queue_conflict(&r->res_convertqueue, lkb))
2117                 return 0;
2118
2119         /*
2120          * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2121          * locks for a recovered rsb, on which lkb's have been rebuilt.
2122          * The lkb's may have been rebuilt on the queues in a different
2123          * order than they were in on the previous master.  So, granting
2124          * queued conversions in order after recovery doesn't make sense
2125          * since the order hasn't been preserved anyway.  The new order
2126          * could also have created a new "in place" conversion deadlock.
2127          * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2128          * After recovery, there would be no granted locks, and possibly
2129          * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2130          * recovery, grant conversions without considering order.
2131          */
2132
2133         if (conv && recover)
2134                 return 1;
2135
2136         /*
2137          * 6-5: But the default algorithm for deciding whether to grant or
2138          * queue conversion requests does not by itself guarantee that such
2139          * requests are serviced on a "first come first serve" basis.  This, in
2140          * turn, can lead to a phenomenon known as "indefinate postponement".
2141          *
2142          * 6-7: This issue is dealt with by using the optional QUECVT flag with
2143          * the system service employed to request a lock conversion.  This flag
2144          * forces certain conversion requests to be queued, even if they are
2145          * compatible with the granted modes of other locks on the same
2146          * resource.  Thus, the use of this flag results in conversion requests
2147          * being ordered on a "first come first servce" basis.
2148          *
2149          * DCT: This condition is all about new conversions being able to occur
2150          * "in place" while the lock remains on the granted queue (assuming
2151          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2152          * doesn't _have_ to go onto the convert queue where it's processed in
2153          * order.  The "now" variable is necessary to distinguish converts
2154          * being received and processed for the first time now, because once a
2155          * convert is moved to the conversion queue the condition below applies
2156          * requiring fifo granting.
2157          */
2158
2159         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2160                 return 1;
2161
2162         /*
2163          * Even if the convert is compat with all granted locks,
2164          * QUECVT forces it behind other locks on the convert queue.
2165          */
2166
2167         if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2168                 if (list_empty(&r->res_convertqueue))
2169                         return 1;
2170                 else
2171                         return 0;
2172         }
2173
2174         /*
2175          * The NOORDER flag is set to avoid the standard vms rules on grant
2176          * order.
2177          */
2178
2179         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2180                 return 1;
2181
2182         /*
2183          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2184          * granted until all other conversion requests ahead of it are granted
2185          * and/or canceled.
2186          */
2187
2188         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2189                 return 1;
2190
2191         /*
2192          * 6-4: By default, a new request is immediately granted only if all
2193          * three of the following conditions are satisfied when the request is
2194          * issued:
2195          * - The queue of ungranted conversion requests for the resource is
2196          *   empty.
2197          * - The queue of ungranted new requests for the resource is empty.
2198          * - The mode of the new request is compatible with the most
2199          *   restrictive mode of all granted locks on the resource.
2200          */
2201
2202         if (now && !conv && list_empty(&r->res_convertqueue) &&
2203             list_empty(&r->res_waitqueue))
2204                 return 1;
2205
2206         /*
2207          * 6-4: Once a lock request is in the queue of ungranted new requests,
2208          * it cannot be granted until the queue of ungranted conversion
2209          * requests is empty, all ungranted new requests ahead of it are
2210          * granted and/or canceled, and it is compatible with the granted mode
2211          * of the most restrictive lock granted on the resource.
2212          */
2213
2214         if (!now && !conv && list_empty(&r->res_convertqueue) &&
2215             first_in_list(lkb, &r->res_waitqueue))
2216                 return 1;
2217
2218         return 0;
2219 }
2220
2221 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2222                           int recover, int *err)
2223 {
2224         int rv;
2225         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2226         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2227
2228         if (err)
2229                 *err = 0;
2230
2231         rv = _can_be_granted(r, lkb, now, recover);
2232         if (rv)
2233                 goto out;
2234
2235         /*
2236          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2237          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2238          * cancels one of the locks.
2239          */
2240
2241         if (is_convert && can_be_queued(lkb) &&
2242             conversion_deadlock_detect(r, lkb)) {
2243                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2244                         lkb->lkb_grmode = DLM_LOCK_NL;
2245                         set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
2246                 } else if (err) {
2247                         *err = -EDEADLK;
2248                 } else {
2249                         log_print("can_be_granted deadlock %x now %d",
2250                                   lkb->lkb_id, now);
2251                         dlm_dump_rsb(r);
2252                 }
2253                 goto out;
2254         }
2255
2256         /*
2257          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2258          * to grant a request in a mode other than the normal rqmode.  It's a
2259          * simple way to provide a big optimization to applications that can
2260          * use them.
2261          */
2262
2263         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2264                 alt = DLM_LOCK_PR;
2265         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2266                 alt = DLM_LOCK_CW;
2267
2268         if (alt) {
2269                 lkb->lkb_rqmode = alt;
2270                 rv = _can_be_granted(r, lkb, now, 0);
2271                 if (rv)
2272                         set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
2273                 else
2274                         lkb->lkb_rqmode = rqmode;
2275         }
2276  out:
2277         return rv;
2278 }
2279
2280 /* Returns the highest requested mode of all blocked conversions; sets
2281    cw if there's a blocked conversion to DLM_LOCK_CW. */
2282
2283 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2284                                  unsigned int *count)
2285 {
2286         struct dlm_lkb *lkb, *s;
2287         int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2288         int hi, demoted, quit, grant_restart, demote_restart;
2289         int deadlk;
2290
2291         quit = 0;
2292  restart:
2293         grant_restart = 0;
2294         demote_restart = 0;
2295         hi = DLM_LOCK_IV;
2296
2297         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2298                 demoted = is_demoted(lkb);
2299                 deadlk = 0;
2300
2301                 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2302                         grant_lock_pending(r, lkb);
2303                         grant_restart = 1;
2304                         if (count)
2305                                 (*count)++;
2306                         continue;
2307                 }
2308
2309                 if (!demoted && is_demoted(lkb)) {
2310                         log_print("WARN: pending demoted %x node %d %s",
2311                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2312                         demote_restart = 1;
2313                         continue;
2314                 }
2315
2316                 if (deadlk) {
2317                         /*
2318                          * If DLM_LKB_NODLKWT flag is set and conversion
2319                          * deadlock is detected, we request blocking AST and
2320                          * down (or cancel) conversion.
2321                          */
2322                         if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2323                                 if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2324                                         queue_bast(r, lkb, lkb->lkb_rqmode);
2325                                         lkb->lkb_highbast = lkb->lkb_rqmode;
2326                                 }
2327                         } else {
2328                                 log_print("WARN: pending deadlock %x node %d %s",
2329                                           lkb->lkb_id, lkb->lkb_nodeid,
2330                                           r->res_name);
2331                                 dlm_dump_rsb(r);
2332                         }
2333                         continue;
2334                 }
2335
2336                 hi = max_t(int, lkb->lkb_rqmode, hi);
2337
2338                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2339                         *cw = 1;
2340         }
2341
2342         if (grant_restart)
2343                 goto restart;
2344         if (demote_restart && !quit) {
2345                 quit = 1;
2346                 goto restart;
2347         }
2348
2349         return max_t(int, high, hi);
2350 }
2351
2352 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2353                               unsigned int *count)
2354 {
2355         struct dlm_lkb *lkb, *s;
2356
2357         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2358                 if (can_be_granted(r, lkb, 0, 0, NULL)) {
2359                         grant_lock_pending(r, lkb);
2360                         if (count)
2361                                 (*count)++;
2362                 } else {
2363                         high = max_t(int, lkb->lkb_rqmode, high);
2364                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
2365                                 *cw = 1;
2366                 }
2367         }
2368
2369         return high;
2370 }
2371
2372 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2373    on either the convert or waiting queue.
2374    high is the largest rqmode of all locks blocked on the convert or
2375    waiting queue. */
2376
2377 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2378 {
2379         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2380                 if (gr->lkb_highbast < DLM_LOCK_EX)
2381                         return 1;
2382                 return 0;
2383         }
2384
2385         if (gr->lkb_highbast < high &&
2386             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2387                 return 1;
2388         return 0;
2389 }
2390
2391 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2392 {
2393         struct dlm_lkb *lkb, *s;
2394         int high = DLM_LOCK_IV;
2395         int cw = 0;
2396
2397         if (!is_master(r)) {
2398                 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2399                 dlm_dump_rsb(r);
2400                 return;
2401         }
2402
2403         high = grant_pending_convert(r, high, &cw, count);
2404         high = grant_pending_wait(r, high, &cw, count);
2405
2406         if (high == DLM_LOCK_IV)
2407                 return;
2408
2409         /*
2410          * If there are locks left on the wait/convert queue then send blocking
2411          * ASTs to granted locks based on the largest requested mode (high)
2412          * found above.
2413          */
2414
2415         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2416                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2417                         if (cw && high == DLM_LOCK_PR &&
2418                             lkb->lkb_grmode == DLM_LOCK_PR)
2419                                 queue_bast(r, lkb, DLM_LOCK_CW);
2420                         else
2421                                 queue_bast(r, lkb, high);
2422                         lkb->lkb_highbast = high;
2423                 }
2424         }
2425 }
2426
2427 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2428 {
2429         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2430             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2431                 if (gr->lkb_highbast < DLM_LOCK_EX)
2432                         return 1;
2433                 return 0;
2434         }
2435
2436         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2437                 return 1;
2438         return 0;
2439 }
2440
2441 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2442                             struct dlm_lkb *lkb)
2443 {
2444         struct dlm_lkb *gr;
2445
2446         list_for_each_entry(gr, head, lkb_statequeue) {
2447                 /* skip self when sending basts to convertqueue */
2448                 if (gr == lkb)
2449                         continue;
2450                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2451                         queue_bast(r, gr, lkb->lkb_rqmode);
2452                         gr->lkb_highbast = lkb->lkb_rqmode;
2453                 }
2454         }
2455 }
2456
2457 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2458 {
2459         send_bast_queue(r, &r->res_grantqueue, lkb);
2460 }
2461
2462 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2463 {
2464         send_bast_queue(r, &r->res_grantqueue, lkb);
2465         send_bast_queue(r, &r->res_convertqueue, lkb);
2466 }
2467
2468 /* set_master(r, lkb) -- set the master nodeid of a resource
2469
2470    The purpose of this function is to set the nodeid field in the given
2471    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2472    known, it can just be copied to the lkb and the function will return
2473    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2474    before it can be copied to the lkb.
2475
2476    When the rsb nodeid is being looked up remotely, the initial lkb
2477    causing the lookup is kept on the ls_waiters list waiting for the
2478    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2479    on the rsb's res_lookup list until the master is verified.
2480
2481    Return values:
2482    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2483    1: the rsb master is not available and the lkb has been placed on
2484       a wait queue
2485 */
2486
2487 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2488 {
2489         int our_nodeid = dlm_our_nodeid();
2490
2491         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2492                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2493                 r->res_first_lkid = lkb->lkb_id;
2494                 lkb->lkb_nodeid = r->res_nodeid;
2495                 return 0;
2496         }
2497
2498         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2499                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2500                 return 1;
2501         }
2502
2503         if (r->res_master_nodeid == our_nodeid) {
2504                 lkb->lkb_nodeid = 0;
2505                 return 0;
2506         }
2507
2508         if (r->res_master_nodeid) {
2509                 lkb->lkb_nodeid = r->res_master_nodeid;
2510                 return 0;
2511         }
2512
2513         if (dlm_dir_nodeid(r) == our_nodeid) {
2514                 /* This is a somewhat unusual case; find_rsb will usually
2515                    have set res_master_nodeid when dir nodeid is local, but
2516                    there are cases where we become the dir node after we've
2517                    past find_rsb and go through _request_lock again.
2518                    confirm_master() or process_lookup_list() needs to be
2519                    called after this. */
2520                 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2521                           lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2522                           r->res_name);
2523                 r->res_master_nodeid = our_nodeid;
2524                 r->res_nodeid = 0;
2525                 lkb->lkb_nodeid = 0;
2526                 return 0;
2527         }
2528
2529         r->res_first_lkid = lkb->lkb_id;
2530         send_lookup(r, lkb);
2531         return 1;
2532 }
2533
2534 static void process_lookup_list(struct dlm_rsb *r)
2535 {
2536         struct dlm_lkb *lkb, *safe;
2537
2538         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2539                 list_del_init(&lkb->lkb_rsb_lookup);
2540                 _request_lock(r, lkb);
2541                 schedule();
2542         }
2543 }
2544
2545 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2546
2547 static void confirm_master(struct dlm_rsb *r, int error)
2548 {
2549         struct dlm_lkb *lkb;
2550
2551         if (!r->res_first_lkid)
2552                 return;
2553
2554         switch (error) {
2555         case 0:
2556         case -EINPROGRESS:
2557                 r->res_first_lkid = 0;
2558                 process_lookup_list(r);
2559                 break;
2560
2561         case -EAGAIN:
2562         case -EBADR:
2563         case -ENOTBLK:
2564                 /* the remote request failed and won't be retried (it was
2565                    a NOQUEUE, or has been canceled/unlocked); make a waiting
2566                    lkb the first_lkid */
2567
2568                 r->res_first_lkid = 0;
2569
2570                 if (!list_empty(&r->res_lookup)) {
2571                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2572                                          lkb_rsb_lookup);
2573                         list_del_init(&lkb->lkb_rsb_lookup);
2574                         r->res_first_lkid = lkb->lkb_id;
2575                         _request_lock(r, lkb);
2576                 }
2577                 break;
2578
2579         default:
2580                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2581         }
2582 }
2583
2584 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2585                          int namelen, void (*ast)(void *astparam),
2586                          void *astparam,
2587                          void (*bast)(void *astparam, int mode),
2588                          struct dlm_args *args)
2589 {
2590         int rv = -EINVAL;
2591
2592         /* check for invalid arg usage */
2593
2594         if (mode < 0 || mode > DLM_LOCK_EX)
2595                 goto out;
2596
2597         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2598                 goto out;
2599
2600         if (flags & DLM_LKF_CANCEL)
2601                 goto out;
2602
2603         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2604                 goto out;
2605
2606         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2607                 goto out;
2608
2609         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2610                 goto out;
2611
2612         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2613                 goto out;
2614
2615         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2616                 goto out;
2617
2618         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2619                 goto out;
2620
2621         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2622                 goto out;
2623
2624         if (!ast || !lksb)
2625                 goto out;
2626
2627         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2628                 goto out;
2629
2630         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2631                 goto out;
2632
2633         /* these args will be copied to the lkb in validate_lock_args,
2634            it cannot be done now because when converting locks, fields in
2635            an active lkb cannot be modified before locking the rsb */
2636
2637         args->flags = flags;
2638         args->astfn = ast;
2639         args->astparam = astparam;
2640         args->bastfn = bast;
2641         args->mode = mode;
2642         args->lksb = lksb;
2643         rv = 0;
2644  out:
2645         return rv;
2646 }
2647
2648 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2649 {
2650         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2651                       DLM_LKF_FORCEUNLOCK))
2652                 return -EINVAL;
2653
2654         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2655                 return -EINVAL;
2656
2657         args->flags = flags;
2658         args->astparam = astarg;
2659         return 0;
2660 }
2661
2662 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2663                               struct dlm_args *args)
2664 {
2665         int rv = -EBUSY;
2666
2667         if (args->flags & DLM_LKF_CONVERT) {
2668                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2669                         goto out;
2670
2671                 /* lock not allowed if there's any op in progress */
2672                 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2673                         goto out;
2674
2675                 if (is_overlap(lkb))
2676                         goto out;
2677
2678                 rv = -EINVAL;
2679                 if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
2680                         goto out;
2681
2682                 if (args->flags & DLM_LKF_QUECVT &&
2683                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2684                         goto out;
2685         }
2686
2687         lkb->lkb_exflags = args->flags;
2688         dlm_set_sbflags_val(lkb, 0);
2689         lkb->lkb_astfn = args->astfn;
2690         lkb->lkb_astparam = args->astparam;
2691         lkb->lkb_bastfn = args->bastfn;
2692         lkb->lkb_rqmode = args->mode;
2693         lkb->lkb_lksb = args->lksb;
2694         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2695         lkb->lkb_ownpid = (int) current->pid;
2696         rv = 0;
2697  out:
2698         switch (rv) {
2699         case 0:
2700                 break;
2701         case -EINVAL:
2702                 /* annoy the user because dlm usage is wrong */
2703                 WARN_ON(1);
2704                 log_error(ls, "%s %d %x %x %x %d %d %s", __func__,
2705                           rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2706                           lkb->lkb_status, lkb->lkb_wait_type,
2707                           lkb->lkb_resource->res_name);
2708                 break;
2709         default:
2710                 log_debug(ls, "%s %d %x %x %x %d %d %s", __func__,
2711                           rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2712                           lkb->lkb_status, lkb->lkb_wait_type,
2713                           lkb->lkb_resource->res_name);
2714                 break;
2715         }
2716
2717         return rv;
2718 }
2719
2720 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2721    for success */
2722
2723 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2724    because there may be a lookup in progress and it's valid to do
2725    cancel/unlockf on it */
2726
2727 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2728 {
2729         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2730         int rv = -EBUSY;
2731
2732         /* normal unlock not allowed if there's any op in progress */
2733         if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
2734             (lkb->lkb_wait_type || lkb->lkb_wait_count))
2735                 goto out;
2736
2737         /* an lkb may be waiting for an rsb lookup to complete where the
2738            lookup was initiated by another lock */
2739
2740         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2741                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2742                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2743                         list_del_init(&lkb->lkb_rsb_lookup);
2744                         queue_cast(lkb->lkb_resource, lkb,
2745                                    args->flags & DLM_LKF_CANCEL ?
2746                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2747                         unhold_lkb(lkb); /* undoes create_lkb() */
2748                 }
2749                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2750                 goto out;
2751         }
2752
2753         rv = -EINVAL;
2754         if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
2755                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2756                 dlm_print_lkb(lkb);
2757                 goto out;
2758         }
2759
2760         /* an lkb may still exist even though the lock is EOL'ed due to a
2761          * cancel, unlock or failed noqueue request; an app can't use these
2762          * locks; return same error as if the lkid had not been found at all
2763          */
2764
2765         if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
2766                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2767                 rv = -ENOENT;
2768                 goto out;
2769         }
2770
2771         /* cancel not allowed with another cancel/unlock in progress */
2772
2773         if (args->flags & DLM_LKF_CANCEL) {
2774                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2775                         goto out;
2776
2777                 if (is_overlap(lkb))
2778                         goto out;
2779
2780                 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2781                         set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2782                         rv = -EBUSY;
2783                         goto out;
2784                 }
2785
2786                 /* there's nothing to cancel */
2787                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2788                     !lkb->lkb_wait_type) {
2789                         rv = -EBUSY;
2790                         goto out;
2791                 }
2792
2793                 switch (lkb->lkb_wait_type) {
2794                 case DLM_MSG_LOOKUP:
2795                 case DLM_MSG_REQUEST:
2796                         set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2797                         rv = -EBUSY;
2798                         goto out;
2799                 case DLM_MSG_UNLOCK:
2800                 case DLM_MSG_CANCEL:
2801                         goto out;
2802                 }
2803                 /* add_to_waiters() will set OVERLAP_CANCEL */
2804                 goto out_ok;
2805         }
2806
2807         /* do we need to allow a force-unlock if there's a normal unlock
2808            already in progress?  in what conditions could the normal unlock
2809            fail such that we'd want to send a force-unlock to be sure? */
2810
2811         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2812                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2813                         goto out;
2814
2815                 if (is_overlap_unlock(lkb))
2816                         goto out;
2817
2818                 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2819                         set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2820                         rv = -EBUSY;
2821                         goto out;
2822                 }
2823
2824                 switch (lkb->lkb_wait_type) {
2825                 case DLM_MSG_LOOKUP:
2826                 case DLM_MSG_REQUEST:
2827                         set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2828                         rv = -EBUSY;
2829                         goto out;
2830                 case DLM_MSG_UNLOCK:
2831                         goto out;
2832                 }
2833                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2834         }
2835
2836  out_ok:
2837         /* an overlapping op shouldn't blow away exflags from other op */
2838         lkb->lkb_exflags |= args->flags;
2839         dlm_set_sbflags_val(lkb, 0);
2840         lkb->lkb_astparam = args->astparam;
2841         rv = 0;
2842  out:
2843         switch (rv) {
2844         case 0:
2845                 break;
2846         case -EINVAL:
2847                 /* annoy the user because dlm usage is wrong */
2848                 WARN_ON(1);
2849                 log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
2850                           lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
2851                           args->flags, lkb->lkb_wait_type,
2852                           lkb->lkb_resource->res_name);
2853                 break;
2854         default:
2855                 log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
2856                           lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
2857                           args->flags, lkb->lkb_wait_type,
2858                           lkb->lkb_resource->res_name);
2859                 break;
2860         }
2861
2862         return rv;
2863 }
2864
2865 /*
2866  * Four stage 4 varieties:
2867  * do_request(), do_convert(), do_unlock(), do_cancel()
2868  * These are called on the master node for the given lock and
2869  * from the central locking logic.
2870  */
2871
2872 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2873 {
2874         int error = 0;
2875
2876         if (can_be_granted(r, lkb, 1, 0, NULL)) {
2877                 grant_lock(r, lkb);
2878                 queue_cast(r, lkb, 0);
2879                 goto out;
2880         }
2881
2882         if (can_be_queued(lkb)) {
2883                 error = -EINPROGRESS;
2884                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2885                 goto out;
2886         }
2887
2888         error = -EAGAIN;
2889         queue_cast(r, lkb, -EAGAIN);
2890  out:
2891         return error;
2892 }
2893
2894 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2895                                int error)
2896 {
2897         switch (error) {
2898         case -EAGAIN:
2899                 if (force_blocking_asts(lkb))
2900                         send_blocking_asts_all(r, lkb);
2901                 break;
2902         case -EINPROGRESS:
2903                 send_blocking_asts(r, lkb);
2904                 break;
2905         }
2906 }
2907
2908 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2909 {
2910         int error = 0;
2911         int deadlk = 0;
2912
2913         /* changing an existing lock may allow others to be granted */
2914
2915         if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
2916                 grant_lock(r, lkb);
2917                 queue_cast(r, lkb, 0);
2918                 goto out;
2919         }
2920
2921         /* can_be_granted() detected that this lock would block in a conversion
2922            deadlock, so we leave it on the granted queue and return EDEADLK in
2923            the ast for the convert. */
2924
2925         if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
2926                 /* it's left on the granted queue */
2927                 revert_lock(r, lkb);
2928                 queue_cast(r, lkb, -EDEADLK);
2929                 error = -EDEADLK;
2930                 goto out;
2931         }
2932
2933         /* is_demoted() means the can_be_granted() above set the grmode
2934            to NL, and left us on the granted queue.  This auto-demotion
2935            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2936            now grantable.  We have to try to grant other converting locks
2937            before we try again to grant this one. */
2938
2939         if (is_demoted(lkb)) {
2940                 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
2941                 if (_can_be_granted(r, lkb, 1, 0)) {
2942                         grant_lock(r, lkb);
2943                         queue_cast(r, lkb, 0);
2944                         goto out;
2945                 }
2946                 /* else fall through and move to convert queue */
2947         }
2948
2949         if (can_be_queued(lkb)) {
2950                 error = -EINPROGRESS;
2951                 del_lkb(r, lkb);
2952                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2953                 goto out;
2954         }
2955
2956         error = -EAGAIN;
2957         queue_cast(r, lkb, -EAGAIN);
2958  out:
2959         return error;
2960 }
2961
2962 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2963                                int error)
2964 {
2965         switch (error) {
2966         case 0:
2967                 grant_pending_locks(r, NULL);
2968                 /* grant_pending_locks also sends basts */
2969                 break;
2970         case -EAGAIN:
2971                 if (force_blocking_asts(lkb))
2972                         send_blocking_asts_all(r, lkb);
2973                 break;
2974         case -EINPROGRESS:
2975                 send_blocking_asts(r, lkb);
2976                 break;
2977         }
2978 }
2979
2980 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2981 {
2982         remove_lock(r, lkb);
2983         queue_cast(r, lkb, -DLM_EUNLOCK);
2984         return -DLM_EUNLOCK;
2985 }
2986
2987 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2988                               int error)
2989 {
2990         grant_pending_locks(r, NULL);
2991 }
2992
2993 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2994
2995 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2996 {
2997         int error;
2998
2999         error = revert_lock(r, lkb);
3000         if (error) {
3001                 queue_cast(r, lkb, -DLM_ECANCEL);
3002                 return -DLM_ECANCEL;
3003         }
3004         return 0;
3005 }
3006
3007 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3008                               int error)
3009 {
3010         if (error)
3011                 grant_pending_locks(r, NULL);
3012 }
3013
3014 /*
3015  * Four stage 3 varieties:
3016  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3017  */
3018
3019 /* add a new lkb to a possibly new rsb, called by requesting process */
3020
3021 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3022 {
3023         int error;
3024
3025         /* set_master: sets lkb nodeid from r */
3026
3027         error = set_master(r, lkb);
3028         if (error < 0)
3029                 goto out;
3030         if (error) {
3031                 error = 0;
3032                 goto out;
3033         }
3034
3035         if (is_remote(r)) {
3036                 /* receive_request() calls do_request() on remote node */
3037                 error = send_request(r, lkb);
3038         } else {
3039                 error = do_request(r, lkb);
3040                 /* for remote locks the request_reply is sent
3041                    between do_request and do_request_effects */
3042                 do_request_effects(r, lkb, error);
3043         }
3044  out:
3045         return error;
3046 }
3047
3048 /* change some property of an existing lkb, e.g. mode */
3049
3050 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3051 {
3052         int error;
3053
3054         if (is_remote(r)) {
3055                 /* receive_convert() calls do_convert() on remote node */
3056                 error = send_convert(r, lkb);
3057         } else {
3058                 error = do_convert(r, lkb);
3059                 /* for remote locks the convert_reply is sent
3060                    between do_convert and do_convert_effects */
3061                 do_convert_effects(r, lkb, error);
3062         }
3063
3064         return error;
3065 }
3066
3067 /* remove an existing lkb from the granted queue */
3068
3069 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3070 {
3071         int error;
3072
3073         if (is_remote(r)) {
3074                 /* receive_unlock() calls do_unlock() on remote node */
3075                 error = send_unlock(r, lkb);
3076         } else {
3077                 error = do_unlock(r, lkb);
3078                 /* for remote locks the unlock_reply is sent
3079                    between do_unlock and do_unlock_effects */
3080                 do_unlock_effects(r, lkb, error);
3081         }
3082
3083         return error;
3084 }
3085
3086 /* remove an existing lkb from the convert or wait queue */
3087
3088 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3089 {
3090         int error;
3091
3092         if (is_remote(r)) {
3093                 /* receive_cancel() calls do_cancel() on remote node */
3094                 error = send_cancel(r, lkb);
3095         } else {
3096                 error = do_cancel(r, lkb);
3097                 /* for remote locks the cancel_reply is sent
3098                    between do_cancel and do_cancel_effects */
3099                 do_cancel_effects(r, lkb, error);
3100         }
3101
3102         return error;
3103 }
3104
3105 /*
3106  * Four stage 2 varieties:
3107  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3108  */
3109
3110 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3111                         const void *name, int len,
3112                         struct dlm_args *args)
3113 {
3114         struct dlm_rsb *r;
3115         int error;
3116
3117         error = validate_lock_args(ls, lkb, args);
3118         if (error)
3119                 return error;
3120
3121         error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3122         if (error)
3123                 return error;
3124
3125         lock_rsb(r);
3126
3127         attach_lkb(r, lkb);
3128         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3129
3130         error = _request_lock(r, lkb);
3131
3132         unlock_rsb(r);
3133         put_rsb(r);
3134         return error;
3135 }
3136
3137 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3138                         struct dlm_args *args)
3139 {
3140         struct dlm_rsb *r;
3141         int error;
3142
3143         r = lkb->lkb_resource;
3144
3145         hold_rsb(r);
3146         lock_rsb(r);
3147
3148         error = validate_lock_args(ls, lkb, args);
3149         if (error)
3150                 goto out;
3151
3152         error = _convert_lock(r, lkb);
3153  out:
3154         unlock_rsb(r);
3155         put_rsb(r);
3156         return error;
3157 }
3158
3159 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3160                        struct dlm_args *args)
3161 {
3162         struct dlm_rsb *r;
3163         int error;
3164
3165         r = lkb->lkb_resource;
3166
3167         hold_rsb(r);
3168         lock_rsb(r);
3169
3170         error = validate_unlock_args(lkb, args);
3171         if (error)
3172                 goto out;
3173
3174         error = _unlock_lock(r, lkb);
3175  out:
3176         unlock_rsb(r);
3177         put_rsb(r);
3178         return error;
3179 }
3180
3181 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3182                        struct dlm_args *args)
3183 {
3184         struct dlm_rsb *r;
3185         int error;
3186
3187         r = lkb->lkb_resource;
3188
3189         hold_rsb(r);
3190         lock_rsb(r);
3191
3192         error = validate_unlock_args(lkb, args);
3193         if (error)
3194                 goto out;
3195
3196         error = _cancel_lock(r, lkb);
3197  out:
3198         unlock_rsb(r);
3199         put_rsb(r);
3200         return error;
3201 }
3202
3203 /*
3204  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3205  */
3206
3207 int dlm_lock(dlm_lockspace_t *lockspace,
3208              int mode,
3209              struct dlm_lksb *lksb,
3210              uint32_t flags,
3211              const void *name,
3212              unsigned int namelen,
3213              uint32_t parent_lkid,
3214              void (*ast) (void *astarg),
3215              void *astarg,
3216              void (*bast) (void *astarg, int mode))
3217 {
3218         struct dlm_ls *ls;
3219         struct dlm_lkb *lkb;
3220         struct dlm_args args;
3221         int error, convert = flags & DLM_LKF_CONVERT;
3222
3223         ls = dlm_find_lockspace_local(lockspace);
3224         if (!ls)
3225                 return -EINVAL;
3226
3227         dlm_lock_recovery(ls);
3228
3229         if (convert)
3230                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3231         else
3232                 error = create_lkb(ls, &lkb);
3233
3234         if (error)
3235                 goto out;
3236
3237         trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3238
3239         error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3240                               &args);
3241         if (error)
3242                 goto out_put;
3243
3244         if (convert)
3245                 error = convert_lock(ls, lkb, &args);
3246         else
3247                 error = request_lock(ls, lkb, name, namelen, &args);
3248
3249         if (error == -EINPROGRESS)
3250                 error = 0;
3251  out_put:
3252         trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3253
3254         if (convert || error)
3255                 __put_lkb(ls, lkb);
3256         if (error == -EAGAIN || error == -EDEADLK)
3257                 error = 0;
3258  out:
3259         dlm_unlock_recovery(ls);
3260         dlm_put_lockspace(ls);
3261         return error;
3262 }
3263
3264 int dlm_unlock(dlm_lockspace_t *lockspace,
3265                uint32_t lkid,
3266                uint32_t flags,
3267                struct dlm_lksb *lksb,
3268                void *astarg)
3269 {
3270         struct dlm_ls *ls;
3271         struct dlm_lkb *lkb;
3272         struct dlm_args args;
3273         int error;
3274
3275         ls = dlm_find_lockspace_local(lockspace);
3276         if (!ls)
3277                 return -EINVAL;
3278
3279         dlm_lock_recovery(ls);
3280
3281         error = find_lkb(ls, lkid, &lkb);
3282         if (error)
3283                 goto out;
3284
3285         trace_dlm_unlock_start(ls, lkb, flags);
3286
3287         error = set_unlock_args(flags, astarg, &args);
3288         if (error)
3289                 goto out_put;
3290
3291         if (flags & DLM_LKF_CANCEL)
3292                 error = cancel_lock(ls, lkb, &args);
3293         else
3294                 error = unlock_lock(ls, lkb, &args);
3295
3296         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3297                 error = 0;
3298         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3299                 error = 0;
3300  out_put:
3301         trace_dlm_unlock_end(ls, lkb, flags, error);
3302
3303         dlm_put_lkb(lkb);
3304  out:
3305         dlm_unlock_recovery(ls);
3306         dlm_put_lockspace(ls);
3307         return error;
3308 }
3309
3310 /*
3311  * send/receive routines for remote operations and replies
3312  *
3313  * send_args
3314  * send_common
3315  * send_request                 receive_request
3316  * send_convert                 receive_convert
3317  * send_unlock                  receive_unlock
3318  * send_cancel                  receive_cancel
3319  * send_grant                   receive_grant
3320  * send_bast                    receive_bast
3321  * send_lookup                  receive_lookup
3322  * send_remove                  receive_remove
3323  *
3324  *                              send_common_reply
3325  * receive_request_reply        send_request_reply
3326  * receive_convert_reply        send_convert_reply
3327  * receive_unlock_reply         send_unlock_reply
3328  * receive_cancel_reply         send_cancel_reply
3329  * receive_lookup_reply         send_lookup_reply
3330  */
3331
3332 static int _create_message(struct dlm_ls *ls, int mb_len,
3333                            int to_nodeid, int mstype,
3334                            struct dlm_message **ms_ret,
3335                            struct dlm_mhandle **mh_ret,
3336                            gfp_t allocation)
3337 {
3338         struct dlm_message *ms;
3339         struct dlm_mhandle *mh;
3340         char *mb;
3341
3342         /* get_buffer gives us a message handle (mh) that we need to
3343            pass into midcomms_commit and a message buffer (mb) that we
3344            write our data into */
3345
3346         mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, allocation, &mb);
3347         if (!mh)
3348                 return -ENOBUFS;
3349
3350         ms = (struct dlm_message *) mb;
3351
3352         ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3353         ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3354         ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3355         ms->m_header.h_length = cpu_to_le16(mb_len);
3356         ms->m_header.h_cmd = DLM_MSG;
3357
3358         ms->m_type = cpu_to_le32(mstype);
3359
3360         *mh_ret = mh;
3361         *ms_ret = ms;
3362         return 0;
3363 }
3364
3365 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3366                           int to_nodeid, int mstype,
3367                           struct dlm_message **ms_ret,
3368                           struct dlm_mhandle **mh_ret,
3369                           gfp_t allocation)
3370 {
3371         int mb_len = sizeof(struct dlm_message);
3372
3373         switch (mstype) {
3374         case DLM_MSG_REQUEST:
3375         case DLM_MSG_LOOKUP:
3376         case DLM_MSG_REMOVE:
3377                 mb_len += r->res_length;
3378                 break;
3379         case DLM_MSG_CONVERT:
3380         case DLM_MSG_UNLOCK:
3381         case DLM_MSG_REQUEST_REPLY:
3382         case DLM_MSG_CONVERT_REPLY:
3383         case DLM_MSG_GRANT:
3384                 if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
3385                         mb_len += r->res_ls->ls_lvblen;
3386                 break;
3387         }
3388
3389         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3390                                ms_ret, mh_ret, allocation);
3391 }
3392
3393 /* further lowcomms enhancements or alternate implementations may make
3394    the return value from this function useful at some point */
3395
3396 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
3397                         const void *name, int namelen)
3398 {
3399         dlm_midcomms_commit_mhandle(mh, name, namelen);
3400         return 0;
3401 }
3402
3403 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3404                       struct dlm_message *ms)
3405 {
3406         ms->m_nodeid   = cpu_to_le32(lkb->lkb_nodeid);
3407         ms->m_pid      = cpu_to_le32(lkb->lkb_ownpid);
3408         ms->m_lkid     = cpu_to_le32(lkb->lkb_id);
3409         ms->m_remid    = cpu_to_le32(lkb->lkb_remid);
3410         ms->m_exflags  = cpu_to_le32(lkb->lkb_exflags);
3411         ms->m_sbflags  = cpu_to_le32(dlm_sbflags_val(lkb));
3412         ms->m_flags    = cpu_to_le32(dlm_dflags_val(lkb));
3413         ms->m_lvbseq   = cpu_to_le32(lkb->lkb_lvbseq);
3414         ms->m_status   = cpu_to_le32(lkb->lkb_status);
3415         ms->m_grmode   = cpu_to_le32(lkb->lkb_grmode);
3416         ms->m_rqmode   = cpu_to_le32(lkb->lkb_rqmode);
3417         ms->m_hash     = cpu_to_le32(r->res_hash);
3418
3419         /* m_result and m_bastmode are set from function args,
3420            not from lkb fields */
3421
3422         if (lkb->lkb_bastfn)
3423                 ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3424         if (lkb->lkb_astfn)
3425                 ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3426
3427         /* compare with switch in create_message; send_remove() doesn't
3428            use send_args() */
3429
3430         switch (ms->m_type) {
3431         case cpu_to_le32(DLM_MSG_REQUEST):
3432         case cpu_to_le32(DLM_MSG_LOOKUP):
3433                 memcpy(ms->m_extra, r->res_name, r->res_length);
3434                 break;
3435         case cpu_to_le32(DLM_MSG_CONVERT):
3436         case cpu_to_le32(DLM_MSG_UNLOCK):
3437         case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3438         case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3439         case cpu_to_le32(DLM_MSG_GRANT):
3440                 if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
3441                         break;
3442                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3443                 break;
3444         }
3445 }
3446
3447 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3448 {
3449         struct dlm_message *ms;
3450         struct dlm_mhandle *mh;
3451         int to_nodeid, error;
3452
3453         to_nodeid = r->res_nodeid;
3454
3455         error = add_to_waiters(lkb, mstype, to_nodeid);
3456         if (error)
3457                 return error;
3458
3459         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS);
3460         if (error)
3461                 goto fail;
3462
3463         send_args(r, lkb, ms);
3464
3465         error = send_message(mh, ms, r->res_name, r->res_length);
3466         if (error)
3467                 goto fail;
3468         return 0;
3469
3470  fail:
3471         remove_from_waiters(lkb, msg_reply_type(mstype));
3472         return error;
3473 }
3474
3475 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3476 {
3477         return send_common(r, lkb, DLM_MSG_REQUEST);
3478 }
3479
3480 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3481 {
3482         int error;
3483
3484         error = send_common(r, lkb, DLM_MSG_CONVERT);
3485
3486         /* down conversions go without a reply from the master */
3487         if (!error && down_conversion(lkb)) {
3488                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3489                 r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3490                 r->res_ls->ls_local_ms.m_result = 0;
3491                 __receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
3492         }
3493
3494         return error;
3495 }
3496
3497 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3498    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3499    that the master is still correct. */
3500
3501 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3502 {
3503         return send_common(r, lkb, DLM_MSG_UNLOCK);
3504 }
3505
3506 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3507 {
3508         return send_common(r, lkb, DLM_MSG_CANCEL);
3509 }
3510
3511 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3512 {
3513         struct dlm_message *ms;
3514         struct dlm_mhandle *mh;
3515         int to_nodeid, error;
3516
3517         to_nodeid = lkb->lkb_nodeid;
3518
3519         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh,
3520                                GFP_NOFS);
3521         if (error)
3522                 goto out;
3523
3524         send_args(r, lkb, ms);
3525
3526         ms->m_result = 0;
3527
3528         error = send_message(mh, ms, r->res_name, r->res_length);
3529  out:
3530         return error;
3531 }
3532
3533 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3534 {
3535         struct dlm_message *ms;
3536         struct dlm_mhandle *mh;
3537         int to_nodeid, error;
3538
3539         to_nodeid = lkb->lkb_nodeid;
3540
3541         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh,
3542                                GFP_NOFS);
3543         if (error)
3544                 goto out;
3545
3546         send_args(r, lkb, ms);
3547
3548         ms->m_bastmode = cpu_to_le32(mode);
3549
3550         error = send_message(mh, ms, r->res_name, r->res_length);
3551  out:
3552         return error;
3553 }
3554
3555 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3556 {
3557         struct dlm_message *ms;
3558         struct dlm_mhandle *mh;
3559         int to_nodeid, error;
3560
3561         to_nodeid = dlm_dir_nodeid(r);
3562
3563         error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3564         if (error)
3565                 return error;
3566
3567         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh,
3568                                GFP_NOFS);
3569         if (error)
3570                 goto fail;
3571
3572         send_args(r, lkb, ms);
3573
3574         error = send_message(mh, ms, r->res_name, r->res_length);
3575         if (error)
3576                 goto fail;
3577         return 0;
3578
3579  fail:
3580         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3581         return error;
3582 }
3583
3584 static int send_remove(struct dlm_rsb *r)
3585 {
3586         struct dlm_message *ms;
3587         struct dlm_mhandle *mh;
3588         int to_nodeid, error;
3589
3590         to_nodeid = dlm_dir_nodeid(r);
3591
3592         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh,
3593                                GFP_ATOMIC);
3594         if (error)
3595                 goto out;
3596
3597         memcpy(ms->m_extra, r->res_name, r->res_length);
3598         ms->m_hash = cpu_to_le32(r->res_hash);
3599
3600         error = send_message(mh, ms, r->res_name, r->res_length);
3601  out:
3602         return error;
3603 }
3604
3605 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3606                              int mstype, int rv)
3607 {
3608         struct dlm_message *ms;
3609         struct dlm_mhandle *mh;
3610         int to_nodeid, error;
3611
3612         to_nodeid = lkb->lkb_nodeid;
3613
3614         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS);
3615         if (error)
3616                 goto out;
3617
3618         send_args(r, lkb, ms);
3619
3620         ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3621
3622         error = send_message(mh, ms, r->res_name, r->res_length);
3623  out:
3624         return error;
3625 }
3626
3627 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3628 {
3629         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3630 }
3631
3632 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3633 {
3634         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3635 }
3636
3637 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3638 {
3639         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3640 }
3641
3642 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3643 {
3644         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3645 }
3646
3647 static int send_lookup_reply(struct dlm_ls *ls,
3648                              const struct dlm_message *ms_in, int ret_nodeid,
3649                              int rv)
3650 {
3651         struct dlm_rsb *r = &ls->ls_local_rsb;
3652         struct dlm_message *ms;
3653         struct dlm_mhandle *mh;
3654         int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3655
3656         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh,
3657                                GFP_NOFS);
3658         if (error)
3659                 goto out;
3660
3661         ms->m_lkid = ms_in->m_lkid;
3662         ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3663         ms->m_nodeid = cpu_to_le32(ret_nodeid);
3664
3665         error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
3666  out:
3667         return error;
3668 }
3669
3670 /* which args we save from a received message depends heavily on the type
3671    of message, unlike the send side where we can safely send everything about
3672    the lkb for any type of message */
3673
3674 static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
3675 {
3676         lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3677         dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3678         dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3679 }
3680
3681 static void receive_flags_reply(struct dlm_lkb *lkb,
3682                                 const struct dlm_message *ms,
3683                                 bool local)
3684 {
3685         if (local)
3686                 return;
3687
3688         dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3689         dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3690 }
3691
3692 static int receive_extralen(const struct dlm_message *ms)
3693 {
3694         return (le16_to_cpu(ms->m_header.h_length) -
3695                 sizeof(struct dlm_message));
3696 }
3697
3698 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3699                        const struct dlm_message *ms)
3700 {
3701         int len;
3702
3703         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3704                 if (!lkb->lkb_lvbptr)
3705                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3706                 if (!lkb->lkb_lvbptr)
3707                         return -ENOMEM;
3708                 len = receive_extralen(ms);
3709                 if (len > ls->ls_lvblen)
3710                         len = ls->ls_lvblen;
3711                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3712         }
3713         return 0;
3714 }
3715
3716 static void fake_bastfn(void *astparam, int mode)
3717 {
3718         log_print("fake_bastfn should not be called");
3719 }
3720
3721 static void fake_astfn(void *astparam)
3722 {
3723         log_print("fake_astfn should not be called");
3724 }
3725
3726 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3727                                 const struct dlm_message *ms)
3728 {
3729         lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3730         lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3731         lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3732         lkb->lkb_grmode = DLM_LOCK_IV;
3733         lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3734
3735         lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3736         lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3737
3738         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3739                 /* lkb was just created so there won't be an lvb yet */
3740                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3741                 if (!lkb->lkb_lvbptr)
3742                         return -ENOMEM;
3743         }
3744
3745         return 0;
3746 }
3747
3748 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3749                                 const struct dlm_message *ms)
3750 {
3751         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3752                 return -EBUSY;
3753
3754         if (receive_lvb(ls, lkb, ms))
3755                 return -ENOMEM;
3756
3757         lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3758         lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3759
3760         return 0;
3761 }
3762
3763 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3764                                const struct dlm_message *ms)
3765 {
3766         if (receive_lvb(ls, lkb, ms))
3767                 return -ENOMEM;
3768         return 0;
3769 }
3770
3771 /* We fill in the local-lkb fields with the info that send_xxxx_reply()
3772    uses to send a reply and that the remote end uses to process the reply. */
3773
3774 static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
3775 {
3776         struct dlm_lkb *lkb = &ls->ls_local_lkb;
3777         lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3778         lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3779 }
3780
3781 /* This is called after the rsb is locked so that we can safely inspect
3782    fields in the lkb. */
3783
3784 static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
3785 {
3786         int from = le32_to_cpu(ms->m_header.h_nodeid);
3787         int error = 0;
3788
3789         /* currently mixing of user/kernel locks are not supported */
3790         if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
3791             !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3792                 log_error(lkb->lkb_resource->res_ls,
3793                           "got user dlm message for a kernel lock");
3794                 error = -EINVAL;
3795                 goto out;
3796         }
3797
3798         switch (ms->m_type) {
3799         case cpu_to_le32(DLM_MSG_CONVERT):
3800         case cpu_to_le32(DLM_MSG_UNLOCK):
3801         case cpu_to_le32(DLM_MSG_CANCEL):
3802                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3803                         error = -EINVAL;
3804                 break;
3805
3806         case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3807         case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
3808         case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
3809         case cpu_to_le32(DLM_MSG_GRANT):
3810         case cpu_to_le32(DLM_MSG_BAST):
3811                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3812                         error = -EINVAL;
3813                 break;
3814
3815         case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3816                 if (!is_process_copy(lkb))
3817                         error = -EINVAL;
3818                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3819                         error = -EINVAL;
3820                 break;
3821
3822         default:
3823                 error = -EINVAL;
3824         }
3825
3826 out:
3827         if (error)
3828                 log_error(lkb->lkb_resource->res_ls,
3829                           "ignore invalid message %d from %d %x %x %x %d",
3830                           le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3831                           lkb->lkb_remid, dlm_iflags_val(lkb),
3832                           lkb->lkb_nodeid);
3833         return error;
3834 }
3835
3836 static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
3837 {
3838         struct dlm_lkb *lkb;
3839         struct dlm_rsb *r;
3840         int from_nodeid;
3841         int error, namelen = 0;
3842
3843         from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3844
3845         error = create_lkb(ls, &lkb);
3846         if (error)
3847                 goto fail;
3848
3849         receive_flags(lkb, ms);
3850         set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
3851         error = receive_request_args(ls, lkb, ms);
3852         if (error) {
3853                 __put_lkb(ls, lkb);
3854                 goto fail;
3855         }
3856
3857         /* The dir node is the authority on whether we are the master
3858            for this rsb or not, so if the master sends us a request, we should
3859            recreate the rsb if we've destroyed it.   This race happens when we
3860            send a remove message to the dir node at the same time that the dir
3861            node sends us a request for the rsb. */
3862
3863         namelen = receive_extralen(ms);
3864
3865         error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
3866                          R_RECEIVE_REQUEST, &r);
3867         if (error) {
3868                 __put_lkb(ls, lkb);
3869                 goto fail;
3870         }
3871
3872         lock_rsb(r);
3873
3874         if (r->res_master_nodeid != dlm_our_nodeid()) {
3875                 error = validate_master_nodeid(ls, r, from_nodeid);
3876                 if (error) {
3877                         unlock_rsb(r);
3878                         put_rsb(r);
3879                         __put_lkb(ls, lkb);
3880                         goto fail;
3881                 }
3882         }
3883
3884         attach_lkb(r, lkb);
3885         error = do_request(r, lkb);
3886         send_request_reply(r, lkb, error);
3887         do_request_effects(r, lkb, error);
3888
3889         unlock_rsb(r);
3890         put_rsb(r);
3891
3892         if (error == -EINPROGRESS)
3893                 error = 0;
3894         if (error)
3895                 dlm_put_lkb(lkb);
3896         return 0;
3897
3898  fail:
3899         /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
3900            and do this receive_request again from process_lookup_list once
3901            we get the lookup reply.  This would avoid a many repeated
3902            ENOTBLK request failures when the lookup reply designating us
3903            as master is delayed. */
3904
3905         if (error != -ENOTBLK) {
3906                 log_limit(ls, "receive_request %x from %d %d",
3907                           le32_to_cpu(ms->m_lkid), from_nodeid, error);
3908         }
3909
3910         setup_local_lkb(ls, ms);
3911         send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
3912         return error;
3913 }
3914
3915 static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
3916 {
3917         struct dlm_lkb *lkb;
3918         struct dlm_rsb *r;
3919         int error, reply = 1;
3920
3921         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
3922         if (error)
3923                 goto fail;
3924
3925         if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
3926                 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
3927                           "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
3928                           (unsigned long long)lkb->lkb_recover_seq,
3929                           le32_to_cpu(ms->m_header.h_nodeid),
3930                           le32_to_cpu(ms->m_lkid));
3931                 error = -ENOENT;
3932                 dlm_put_lkb(lkb);
3933                 goto fail;
3934         }
3935
3936         r = lkb->lkb_resource;
3937
3938         hold_rsb(r);
3939         lock_rsb(r);
3940
3941         error = validate_message(lkb, ms);
3942         if (error)
3943                 goto out;
3944
3945         receive_flags(lkb, ms);
3946
3947         error = receive_convert_args(ls, lkb, ms);
3948         if (error) {
3949                 send_convert_reply(r, lkb, error);
3950                 goto out;
3951         }
3952
3953         reply = !down_conversion(lkb);
3954
3955         error = do_convert(r, lkb);
3956         if (reply)
3957                 send_convert_reply(r, lkb, error);
3958         do_convert_effects(r, lkb, error);
3959  out:
3960         unlock_rsb(r);
3961         put_rsb(r);
3962         dlm_put_lkb(lkb);
3963         return 0;
3964
3965  fail:
3966         setup_local_lkb(ls, ms);
3967         send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
3968         return error;
3969 }
3970
3971 static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
3972 {
3973         struct dlm_lkb *lkb;
3974         struct dlm_rsb *r;
3975         int error;
3976
3977         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
3978         if (error)
3979                 goto fail;
3980
3981         if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
3982                 log_error(ls, "receive_unlock %x remid %x remote %d %x",
3983                           lkb->lkb_id, lkb->lkb_remid,
3984                           le32_to_cpu(ms->m_header.h_nodeid),
3985                           le32_to_cpu(ms->m_lkid));
3986                 error = -ENOENT;
3987                 dlm_put_lkb(lkb);
3988                 goto fail;
3989         }
3990
3991         r = lkb->lkb_resource;
3992
3993         hold_rsb(r);
3994         lock_rsb(r);
3995
3996         error = validate_message(lkb, ms);
3997         if (error)
3998                 goto out;
3999
4000         receive_flags(lkb, ms);
4001
4002         error = receive_unlock_args(ls, lkb, ms);
4003         if (error) {
4004                 send_unlock_reply(r, lkb, error);
4005                 goto out;
4006         }
4007
4008         error = do_unlock(r, lkb);
4009         send_unlock_reply(r, lkb, error);
4010         do_unlock_effects(r, lkb, error);
4011  out:
4012         unlock_rsb(r);
4013         put_rsb(r);
4014         dlm_put_lkb(lkb);
4015         return 0;
4016
4017  fail:
4018         setup_local_lkb(ls, ms);
4019         send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4020         return error;
4021 }
4022
4023 static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
4024 {
4025         struct dlm_lkb *lkb;
4026         struct dlm_rsb *r;
4027         int error;
4028
4029         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4030         if (error)
4031                 goto fail;
4032
4033         receive_flags(lkb, ms);
4034
4035         r = lkb->lkb_resource;
4036
4037         hold_rsb(r);
4038         lock_rsb(r);
4039
4040         error = validate_message(lkb, ms);
4041         if (error)
4042                 goto out;
4043
4044         error = do_cancel(r, lkb);
4045         send_cancel_reply(r, lkb, error);
4046         do_cancel_effects(r, lkb, error);
4047  out:
4048         unlock_rsb(r);
4049         put_rsb(r);
4050         dlm_put_lkb(lkb);
4051         return 0;
4052
4053  fail:
4054         setup_local_lkb(ls, ms);
4055         send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4056         return error;
4057 }
4058
4059 static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
4060 {
4061         struct dlm_lkb *lkb;
4062         struct dlm_rsb *r;
4063         int error;
4064
4065         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4066         if (error)
4067                 return error;
4068
4069         r = lkb->lkb_resource;
4070
4071         hold_rsb(r);
4072         lock_rsb(r);
4073
4074         error = validate_message(lkb, ms);
4075         if (error)
4076                 goto out;
4077
4078         receive_flags_reply(lkb, ms, false);
4079         if (is_altmode(lkb))
4080                 munge_altmode(lkb, ms);
4081         grant_lock_pc(r, lkb, ms);
4082         queue_cast(r, lkb, 0);
4083  out:
4084         unlock_rsb(r);
4085         put_rsb(r);
4086         dlm_put_lkb(lkb);
4087         return 0;
4088 }
4089
4090 static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
4091 {
4092         struct dlm_lkb *lkb;
4093         struct dlm_rsb *r;
4094         int error;
4095
4096         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4097         if (error)
4098                 return error;
4099
4100         r = lkb->lkb_resource;
4101
4102         hold_rsb(r);
4103         lock_rsb(r);
4104
4105         error = validate_message(lkb, ms);
4106         if (error)
4107                 goto out;
4108
4109         queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4110         lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4111  out:
4112         unlock_rsb(r);
4113         put_rsb(r);
4114         dlm_put_lkb(lkb);
4115         return 0;
4116 }
4117
4118 static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
4119 {
4120         int len, error, ret_nodeid, from_nodeid, our_nodeid;
4121
4122         from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4123         our_nodeid = dlm_our_nodeid();
4124
4125         len = receive_extralen(ms);
4126
4127         error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4128                                   &ret_nodeid, NULL);
4129
4130         /* Optimization: we're master so treat lookup as a request */
4131         if (!error && ret_nodeid == our_nodeid) {
4132                 receive_request(ls, ms);
4133                 return;
4134         }
4135         send_lookup_reply(ls, ms, ret_nodeid, error);
4136 }
4137
4138 static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
4139 {
4140         char name[DLM_RESNAME_MAXLEN+1];
4141         struct dlm_rsb *r;
4142         uint32_t hash, b;
4143         int rv, len, dir_nodeid, from_nodeid;
4144
4145         from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4146
4147         len = receive_extralen(ms);
4148
4149         if (len > DLM_RESNAME_MAXLEN) {
4150                 log_error(ls, "receive_remove from %d bad len %d",
4151                           from_nodeid, len);
4152                 return;
4153         }
4154
4155         dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4156         if (dir_nodeid != dlm_our_nodeid()) {
4157                 log_error(ls, "receive_remove from %d bad nodeid %d",
4158                           from_nodeid, dir_nodeid);
4159                 return;
4160         }
4161
4162         /* Look for name on rsbtbl.toss, if it's there, kill it.
4163            If it's on rsbtbl.keep, it's being used, and we should ignore this
4164            message.  This is an expected race between the dir node sending a
4165            request to the master node at the same time as the master node sends
4166            a remove to the dir node.  The resolution to that race is for the
4167            dir node to ignore the remove message, and the master node to
4168            recreate the master rsb when it gets a request from the dir node for
4169            an rsb it doesn't have. */
4170
4171         memset(name, 0, sizeof(name));
4172         memcpy(name, ms->m_extra, len);
4173
4174         hash = jhash(name, len, 0);
4175         b = hash & (ls->ls_rsbtbl_size - 1);
4176
4177         spin_lock(&ls->ls_rsbtbl[b].lock);
4178
4179         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4180         if (rv) {
4181                 /* verify the rsb is on keep list per comment above */
4182                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4183                 if (rv) {
4184                         /* should not happen */
4185                         log_error(ls, "receive_remove from %d not found %s",
4186                                   from_nodeid, name);
4187                         spin_unlock(&ls->ls_rsbtbl[b].lock);
4188                         return;
4189                 }
4190                 if (r->res_master_nodeid != from_nodeid) {
4191                         /* should not happen */
4192                         log_error(ls, "receive_remove keep from %d master %d",
4193                                   from_nodeid, r->res_master_nodeid);
4194                         dlm_print_rsb(r);
4195                         spin_unlock(&ls->ls_rsbtbl[b].lock);
4196                         return;
4197                 }
4198
4199                 log_debug(ls, "receive_remove from %d master %d first %x %s",
4200                           from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4201                           name);
4202                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4203                 return;
4204         }
4205
4206         if (r->res_master_nodeid != from_nodeid) {
4207                 log_error(ls, "receive_remove toss from %d master %d",
4208                           from_nodeid, r->res_master_nodeid);
4209                 dlm_print_rsb(r);
4210                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4211                 return;
4212         }
4213
4214         if (kref_put(&r->res_ref, kill_rsb)) {
4215                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4216                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4217                 dlm_free_rsb(r);
4218         } else {
4219                 log_error(ls, "receive_remove from %d rsb ref error",
4220                           from_nodeid);
4221                 dlm_print_rsb(r);
4222                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4223         }
4224 }
4225
4226 static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
4227 {
4228         do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4229 }
4230
4231 static int receive_request_reply(struct dlm_ls *ls,
4232                                  const struct dlm_message *ms)
4233 {
4234         struct dlm_lkb *lkb;
4235         struct dlm_rsb *r;
4236         int error, mstype, result;
4237         int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4238
4239         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4240         if (error)
4241                 return error;
4242
4243         r = lkb->lkb_resource;
4244         hold_rsb(r);
4245         lock_rsb(r);
4246
4247         error = validate_message(lkb, ms);
4248         if (error)
4249                 goto out;
4250
4251         mstype = lkb->lkb_wait_type;
4252         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4253         if (error) {
4254                 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4255                           lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4256                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4257                 dlm_dump_rsb(r);
4258                 goto out;
4259         }
4260
4261         /* Optimization: the dir node was also the master, so it took our
4262            lookup as a request and sent request reply instead of lookup reply */
4263         if (mstype == DLM_MSG_LOOKUP) {
4264                 r->res_master_nodeid = from_nodeid;
4265                 r->res_nodeid = from_nodeid;
4266                 lkb->lkb_nodeid = from_nodeid;
4267         }
4268
4269         /* this is the value returned from do_request() on the master */
4270         result = from_dlm_errno(le32_to_cpu(ms->m_result));
4271
4272         switch (result) {
4273         case -EAGAIN:
4274                 /* request would block (be queued) on remote master */
4275                 queue_cast(r, lkb, -EAGAIN);
4276                 confirm_master(r, -EAGAIN);
4277                 unhold_lkb(lkb); /* undoes create_lkb() */
4278                 break;
4279
4280         case -EINPROGRESS:
4281         case 0:
4282                 /* request was queued or granted on remote master */
4283                 receive_flags_reply(lkb, ms, false);
4284                 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4285                 if (is_altmode(lkb))
4286                         munge_altmode(lkb, ms);
4287                 if (result) {
4288                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
4289                 } else {
4290                         grant_lock_pc(r, lkb, ms);
4291                         queue_cast(r, lkb, 0);
4292                 }
4293                 confirm_master(r, result);
4294                 break;
4295
4296         case -EBADR:
4297         case -ENOTBLK:
4298                 /* find_rsb failed to find rsb or rsb wasn't master */
4299                 log_limit(ls, "receive_request_reply %x from %d %d "
4300                           "master %d dir %d first %x %s", lkb->lkb_id,
4301                           from_nodeid, result, r->res_master_nodeid,
4302                           r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4303
4304                 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4305                     r->res_master_nodeid != dlm_our_nodeid()) {
4306                         /* cause _request_lock->set_master->send_lookup */
4307                         r->res_master_nodeid = 0;
4308                         r->res_nodeid = -1;
4309                         lkb->lkb_nodeid = -1;
4310                 }
4311
4312                 if (is_overlap(lkb)) {
4313                         /* we'll ignore error in cancel/unlock reply */
4314                         queue_cast_overlap(r, lkb);
4315                         confirm_master(r, result);
4316                         unhold_lkb(lkb); /* undoes create_lkb() */
4317                 } else {
4318                         _request_lock(r, lkb);
4319
4320                         if (r->res_master_nodeid == dlm_our_nodeid())
4321                                 confirm_master(r, 0);
4322                 }
4323                 break;
4324
4325         default:
4326                 log_error(ls, "receive_request_reply %x error %d",
4327                           lkb->lkb_id, result);
4328         }
4329
4330         if ((result == 0 || result == -EINPROGRESS) &&
4331             test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
4332                 log_debug(ls, "receive_request_reply %x result %d unlock",
4333                           lkb->lkb_id, result);
4334                 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4335                 send_unlock(r, lkb);
4336         } else if ((result == -EINPROGRESS) &&
4337                    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
4338                                       &lkb->lkb_iflags)) {
4339                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4340                 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4341                 send_cancel(r, lkb);
4342         } else {
4343                 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4344                 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4345         }
4346  out:
4347         unlock_rsb(r);
4348         put_rsb(r);
4349         dlm_put_lkb(lkb);
4350         return 0;
4351 }
4352
4353 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4354                                     const struct dlm_message *ms, bool local)
4355 {
4356         /* this is the value returned from do_convert() on the master */
4357         switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4358         case -EAGAIN:
4359                 /* convert would block (be queued) on remote master */
4360                 queue_cast(r, lkb, -EAGAIN);
4361                 break;
4362
4363         case -EDEADLK:
4364                 receive_flags_reply(lkb, ms, local);
4365                 revert_lock_pc(r, lkb);
4366                 queue_cast(r, lkb, -EDEADLK);
4367                 break;
4368
4369         case -EINPROGRESS:
4370                 /* convert was queued on remote master */
4371                 receive_flags_reply(lkb, ms, local);
4372                 if (is_demoted(lkb))
4373                         munge_demoted(lkb);
4374                 del_lkb(r, lkb);
4375                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4376                 break;
4377
4378         case 0:
4379                 /* convert was granted on remote master */
4380                 receive_flags_reply(lkb, ms, local);
4381                 if (is_demoted(lkb))
4382                         munge_demoted(lkb);
4383                 grant_lock_pc(r, lkb, ms);
4384                 queue_cast(r, lkb, 0);
4385                 break;
4386
4387         default:
4388                 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4389                           lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4390                           le32_to_cpu(ms->m_lkid),
4391                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4392                 dlm_print_rsb(r);
4393                 dlm_print_lkb(lkb);
4394         }
4395 }
4396
4397 static void _receive_convert_reply(struct dlm_lkb *lkb,
4398                                    const struct dlm_message *ms, bool local)
4399 {
4400         struct dlm_rsb *r = lkb->lkb_resource;
4401         int error;
4402
4403         hold_rsb(r);
4404         lock_rsb(r);
4405
4406         error = validate_message(lkb, ms);
4407         if (error)
4408                 goto out;
4409
4410         /* local reply can happen with waiters_mutex held */
4411         error = remove_from_waiters_ms(lkb, ms, local);
4412         if (error)
4413                 goto out;
4414
4415         __receive_convert_reply(r, lkb, ms, local);
4416  out:
4417         unlock_rsb(r);
4418         put_rsb(r);
4419 }
4420
4421 static int receive_convert_reply(struct dlm_ls *ls,
4422                                  const struct dlm_message *ms)
4423 {
4424         struct dlm_lkb *lkb;
4425         int error;
4426
4427         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4428         if (error)
4429                 return error;
4430
4431         _receive_convert_reply(lkb, ms, false);
4432         dlm_put_lkb(lkb);
4433         return 0;
4434 }
4435
4436 static void _receive_unlock_reply(struct dlm_lkb *lkb,
4437                                   const struct dlm_message *ms, bool local)
4438 {
4439         struct dlm_rsb *r = lkb->lkb_resource;
4440         int error;
4441
4442         hold_rsb(r);
4443         lock_rsb(r);
4444
4445         error = validate_message(lkb, ms);
4446         if (error)
4447                 goto out;
4448
4449         /* local reply can happen with waiters_mutex held */
4450         error = remove_from_waiters_ms(lkb, ms, local);
4451         if (error)
4452                 goto out;
4453
4454         /* this is the value returned from do_unlock() on the master */
4455
4456         switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4457         case -DLM_EUNLOCK:
4458                 receive_flags_reply(lkb, ms, local);
4459                 remove_lock_pc(r, lkb);
4460                 queue_cast(r, lkb, -DLM_EUNLOCK);
4461                 break;
4462         case -ENOENT:
4463                 break;
4464         default:
4465                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4466                           lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4467         }
4468  out:
4469         unlock_rsb(r);
4470         put_rsb(r);
4471 }
4472
4473 static int receive_unlock_reply(struct dlm_ls *ls,
4474                                 const struct dlm_message *ms)
4475 {
4476         struct dlm_lkb *lkb;
4477         int error;
4478
4479         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4480         if (error)
4481                 return error;
4482
4483         _receive_unlock_reply(lkb, ms, false);
4484         dlm_put_lkb(lkb);
4485         return 0;
4486 }
4487
4488 static void _receive_cancel_reply(struct dlm_lkb *lkb,
4489                                   const struct dlm_message *ms, bool local)
4490 {
4491         struct dlm_rsb *r = lkb->lkb_resource;
4492         int error;
4493
4494         hold_rsb(r);
4495         lock_rsb(r);
4496
4497         error = validate_message(lkb, ms);
4498         if (error)
4499                 goto out;
4500
4501         /* local reply can happen with waiters_mutex held */
4502         error = remove_from_waiters_ms(lkb, ms, local);
4503         if (error)
4504                 goto out;
4505
4506         /* this is the value returned from do_cancel() on the master */
4507
4508         switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4509         case -DLM_ECANCEL:
4510                 receive_flags_reply(lkb, ms, local);
4511                 revert_lock_pc(r, lkb);
4512                 queue_cast(r, lkb, -DLM_ECANCEL);
4513                 break;
4514         case 0:
4515                 break;
4516         default:
4517                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
4518                           lkb->lkb_id,
4519                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4520         }
4521  out:
4522         unlock_rsb(r);
4523         put_rsb(r);
4524 }
4525
4526 static int receive_cancel_reply(struct dlm_ls *ls,
4527                                 const struct dlm_message *ms)
4528 {
4529         struct dlm_lkb *lkb;
4530         int error;
4531
4532         error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4533         if (error)
4534                 return error;
4535
4536         _receive_cancel_reply(lkb, ms, false);
4537         dlm_put_lkb(lkb);
4538         return 0;
4539 }
4540
4541 static void receive_lookup_reply(struct dlm_ls *ls,
4542                                  const struct dlm_message *ms)
4543 {
4544         struct dlm_lkb *lkb;
4545         struct dlm_rsb *r;
4546         int error, ret_nodeid;
4547         int do_lookup_list = 0;
4548
4549         error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4550         if (error) {
4551                 log_error(ls, "%s no lkid %x", __func__,
4552                           le32_to_cpu(ms->m_lkid));
4553                 return;
4554         }
4555
4556         /* ms->m_result is the value returned by dlm_master_lookup on dir node
4557            FIXME: will a non-zero error ever be returned? */
4558
4559         r = lkb->lkb_resource;
4560         hold_rsb(r);
4561         lock_rsb(r);
4562
4563         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4564         if (error)
4565                 goto out;
4566
4567         ret_nodeid = le32_to_cpu(ms->m_nodeid);
4568
4569         /* We sometimes receive a request from the dir node for this
4570            rsb before we've received the dir node's loookup_reply for it.
4571            The request from the dir node implies we're the master, so we set
4572            ourself as master in receive_request_reply, and verify here that
4573            we are indeed the master. */
4574
4575         if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4576                 /* This should never happen */
4577                 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4578                           "master %d dir %d our %d first %x %s",
4579                           lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4580                           ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4581                           dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4582         }
4583
4584         if (ret_nodeid == dlm_our_nodeid()) {
4585                 r->res_master_nodeid = ret_nodeid;
4586                 r->res_nodeid = 0;
4587                 do_lookup_list = 1;
4588                 r->res_first_lkid = 0;
4589         } else if (ret_nodeid == -1) {
4590                 /* the remote node doesn't believe it's the dir node */
4591                 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4592                           lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4593                 r->res_master_nodeid = 0;
4594                 r->res_nodeid = -1;
4595                 lkb->lkb_nodeid = -1;
4596         } else {
4597                 /* set_master() will set lkb_nodeid from r */
4598                 r->res_master_nodeid = ret_nodeid;
4599                 r->res_nodeid = ret_nodeid;
4600         }
4601
4602         if (is_overlap(lkb)) {
4603                 log_debug(ls, "receive_lookup_reply %x unlock %x",
4604                           lkb->lkb_id, dlm_iflags_val(lkb));
4605                 queue_cast_overlap(r, lkb);
4606                 unhold_lkb(lkb); /* undoes create_lkb() */
4607                 goto out_list;
4608         }
4609
4610         _request_lock(r, lkb);
4611
4612  out_list:
4613         if (do_lookup_list)
4614                 process_lookup_list(r);
4615  out:
4616         unlock_rsb(r);
4617         put_rsb(r);
4618         dlm_put_lkb(lkb);
4619 }
4620
4621 static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4622                              uint32_t saved_seq)
4623 {
4624         int error = 0, noent = 0;
4625
4626         if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
4627                 log_limit(ls, "receive %d from non-member %d %x %x %d",
4628                           le32_to_cpu(ms->m_type),
4629                           le32_to_cpu(ms->m_header.h_nodeid),
4630                           le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4631                           from_dlm_errno(le32_to_cpu(ms->m_result)));
4632                 return;
4633         }
4634
4635         switch (ms->m_type) {
4636
4637         /* messages sent to a master node */
4638
4639         case cpu_to_le32(DLM_MSG_REQUEST):
4640                 error = receive_request(ls, ms);
4641                 break;
4642
4643         case cpu_to_le32(DLM_MSG_CONVERT):
4644                 error = receive_convert(ls, ms);
4645                 break;
4646
4647         case cpu_to_le32(DLM_MSG_UNLOCK):
4648                 error = receive_unlock(ls, ms);
4649                 break;
4650
4651         case cpu_to_le32(DLM_MSG_CANCEL):
4652                 noent = 1;
4653                 error = receive_cancel(ls, ms);
4654                 break;
4655
4656         /* messages sent from a master node (replies to above) */
4657
4658         case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4659                 error = receive_request_reply(ls, ms);
4660                 break;
4661
4662         case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4663                 error = receive_convert_reply(ls, ms);
4664                 break;
4665
4666         case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4667                 error = receive_unlock_reply(ls, ms);
4668                 break;
4669
4670         case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4671                 error = receive_cancel_reply(ls, ms);
4672                 break;
4673
4674         /* messages sent from a master node (only two types of async msg) */
4675
4676         case cpu_to_le32(DLM_MSG_GRANT):
4677                 noent = 1;
4678                 error = receive_grant(ls, ms);
4679                 break;
4680
4681         case cpu_to_le32(DLM_MSG_BAST):
4682                 noent = 1;
4683                 error = receive_bast(ls, ms);
4684                 break;
4685
4686         /* messages sent to a dir node */
4687
4688         case cpu_to_le32(DLM_MSG_LOOKUP):
4689                 receive_lookup(ls, ms);
4690                 break;
4691
4692         case cpu_to_le32(DLM_MSG_REMOVE):
4693                 receive_remove(ls, ms);
4694                 break;
4695
4696         /* messages sent from a dir node (remove has no reply) */
4697
4698         case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4699                 receive_lookup_reply(ls, ms);
4700                 break;
4701
4702         /* other messages */
4703
4704         case cpu_to_le32(DLM_MSG_PURGE):
4705                 receive_purge(ls, ms);
4706                 break;
4707
4708         default:
4709                 log_error(ls, "unknown message type %d",
4710                           le32_to_cpu(ms->m_type));
4711         }
4712
4713         /*
4714          * When checking for ENOENT, we're checking the result of
4715          * find_lkb(m_remid):
4716          *
4717          * The lock id referenced in the message wasn't found.  This may
4718          * happen in normal usage for the async messages and cancel, so
4719          * only use log_debug for them.
4720          *
4721          * Some errors are expected and normal.
4722          */
4723
4724         if (error == -ENOENT && noent) {
4725                 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4726                           le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4727                           le32_to_cpu(ms->m_header.h_nodeid),
4728                           le32_to_cpu(ms->m_lkid), saved_seq);
4729         } else if (error == -ENOENT) {
4730                 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4731                           le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4732                           le32_to_cpu(ms->m_header.h_nodeid),
4733                           le32_to_cpu(ms->m_lkid), saved_seq);
4734
4735                 if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
4736                         dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4737         }
4738
4739         if (error == -EINVAL) {
4740                 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4741                           "saved_seq %u",
4742                           le32_to_cpu(ms->m_type),
4743                           le32_to_cpu(ms->m_header.h_nodeid),
4744                           le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4745                           saved_seq);
4746         }
4747 }
4748
4749 /* If the lockspace is in recovery mode (locking stopped), then normal
4750    messages are saved on the requestqueue for processing after recovery is
4751    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4752    messages off the requestqueue before we process new ones. This occurs right
4753    after recovery completes when we transition from saving all messages on
4754    requestqueue, to processing all the saved messages, to processing new
4755    messages as they arrive. */
4756
4757 static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4758                                 int nodeid)
4759 {
4760         if (dlm_locking_stopped(ls)) {
4761                 /* If we were a member of this lockspace, left, and rejoined,
4762                    other nodes may still be sending us messages from the
4763                    lockspace generation before we left. */
4764                 if (WARN_ON_ONCE(!ls->ls_generation)) {
4765                         log_limit(ls, "receive %d from %d ignore old gen",
4766                                   le32_to_cpu(ms->m_type), nodeid);
4767                         return;
4768                 }
4769
4770                 dlm_add_requestqueue(ls, nodeid, ms);
4771         } else {
4772                 dlm_wait_requestqueue(ls);
4773                 _receive_message(ls, ms, 0);
4774         }
4775 }
4776
4777 /* This is called by dlm_recoverd to process messages that were saved on
4778    the requestqueue. */
4779
4780 void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
4781                                uint32_t saved_seq)
4782 {
4783         _receive_message(ls, ms, saved_seq);
4784 }
4785
4786 /* This is called by the midcomms layer when something is received for
4787    the lockspace.  It could be either a MSG (normal message sent as part of
4788    standard locking activity) or an RCOM (recovery message sent as part of
4789    lockspace recovery). */
4790
4791 void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
4792 {
4793         const struct dlm_header *hd = &p->header;
4794         struct dlm_ls *ls;
4795         int type = 0;
4796
4797         switch (hd->h_cmd) {
4798         case DLM_MSG:
4799                 type = le32_to_cpu(p->message.m_type);
4800                 break;
4801         case DLM_RCOM:
4802                 type = le32_to_cpu(p->rcom.rc_type);
4803                 break;
4804         default:
4805                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4806                 return;
4807         }
4808
4809         if (le32_to_cpu(hd->h_nodeid) != nodeid) {
4810                 log_print("invalid h_nodeid %d from %d lockspace %x",
4811                           le32_to_cpu(hd->h_nodeid), nodeid,
4812                           le32_to_cpu(hd->u.h_lockspace));
4813                 return;
4814         }
4815
4816         ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
4817         if (!ls) {
4818                 if (dlm_config.ci_log_debug) {
4819                         printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4820                                 "%u from %d cmd %d type %d\n",
4821                                 le32_to_cpu(hd->u.h_lockspace), nodeid,
4822                                 hd->h_cmd, type);
4823                 }
4824
4825                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4826                         dlm_send_ls_not_ready(nodeid, &p->rcom);
4827                 return;
4828         }
4829
4830         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4831            be inactive (in this ls) before transitioning to recovery mode */
4832
4833         down_read(&ls->ls_recv_active);
4834         if (hd->h_cmd == DLM_MSG)
4835                 dlm_receive_message(ls, &p->message, nodeid);
4836         else if (hd->h_cmd == DLM_RCOM)
4837                 dlm_receive_rcom(ls, &p->rcom, nodeid);
4838         else
4839                 log_error(ls, "invalid h_cmd %d from %d lockspace %x",
4840                           hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
4841         up_read(&ls->ls_recv_active);
4842
4843         dlm_put_lockspace(ls);
4844 }
4845
4846 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4847                                    struct dlm_message *ms_local)
4848 {
4849         if (middle_conversion(lkb)) {
4850                 hold_lkb(lkb);
4851                 memset(ms_local, 0, sizeof(struct dlm_message));
4852                 ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
4853                 ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
4854                 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
4855                 _receive_convert_reply(lkb, ms_local, true);
4856
4857                 /* Same special case as in receive_rcom_lock_args() */
4858                 lkb->lkb_grmode = DLM_LOCK_IV;
4859                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4860                 unhold_lkb(lkb);
4861
4862         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4863                 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
4864         }
4865
4866         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4867            conversions are async; there's no reply from the remote master */
4868 }
4869
4870 /* A waiting lkb needs recovery if the master node has failed, or
4871    the master node is changing (only when no directory is used) */
4872
4873 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
4874                                  int dir_nodeid)
4875 {
4876         if (dlm_no_directory(ls))
4877                 return 1;
4878
4879         if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
4880                 return 1;
4881
4882         return 0;
4883 }
4884
4885 /* Recovery for locks that are waiting for replies from nodes that are now
4886    gone.  We can just complete unlocks and cancels by faking a reply from the
4887    dead node.  Requests and up-conversions we flag to be resent after
4888    recovery.  Down-conversions can just be completed with a fake reply like
4889    unlocks.  Conversions between PR and CW need special attention. */
4890
4891 void dlm_recover_waiters_pre(struct dlm_ls *ls)
4892 {
4893         struct dlm_lkb *lkb, *safe;
4894         struct dlm_message *ms_local;
4895         int wait_type, local_unlock_result, local_cancel_result;
4896         int dir_nodeid;
4897
4898         ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
4899         if (!ms_local)
4900                 return;
4901
4902         mutex_lock(&ls->ls_waiters_mutex);
4903
4904         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4905
4906                 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
4907
4908                 /* exclude debug messages about unlocks because there can be so
4909                    many and they aren't very interesting */
4910
4911                 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4912                         log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
4913                                   "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
4914                                   lkb->lkb_id,
4915                                   lkb->lkb_remid,
4916                                   lkb->lkb_wait_type,
4917                                   lkb->lkb_resource->res_nodeid,
4918                                   lkb->lkb_nodeid,
4919                                   lkb->lkb_wait_nodeid,
4920                                   dir_nodeid);
4921                 }
4922
4923                 /* all outstanding lookups, regardless of destination  will be
4924                    resent after recovery is done */
4925
4926                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4927                         set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
4928                         continue;
4929                 }
4930
4931                 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
4932                         continue;
4933
4934                 wait_type = lkb->lkb_wait_type;
4935                 local_unlock_result = -DLM_EUNLOCK;
4936                 local_cancel_result = -DLM_ECANCEL;
4937
4938                 /* Main reply may have been received leaving a zero wait_type,
4939                    but a reply for the overlapping op may not have been
4940                    received.  In that case we need to fake the appropriate
4941                    reply for the overlap op. */
4942
4943                 if (!wait_type) {
4944                         if (is_overlap_cancel(lkb)) {
4945                                 wait_type = DLM_MSG_CANCEL;
4946                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4947                                         local_cancel_result = 0;
4948                         }
4949                         if (is_overlap_unlock(lkb)) {
4950                                 wait_type = DLM_MSG_UNLOCK;
4951                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4952                                         local_unlock_result = -ENOENT;
4953                         }
4954
4955                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
4956                                   lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
4957                                   local_cancel_result, local_unlock_result);
4958                 }
4959
4960                 switch (wait_type) {
4961
4962                 case DLM_MSG_REQUEST:
4963                         set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
4964                         break;
4965
4966                 case DLM_MSG_CONVERT:
4967                         recover_convert_waiter(ls, lkb, ms_local);
4968                         break;
4969
4970                 case DLM_MSG_UNLOCK:
4971                         hold_lkb(lkb);
4972                         memset(ms_local, 0, sizeof(struct dlm_message));
4973                         ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
4974                         ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
4975                         ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
4976                         _receive_unlock_reply(lkb, ms_local, true);
4977                         dlm_put_lkb(lkb);
4978                         break;
4979
4980                 case DLM_MSG_CANCEL:
4981                         hold_lkb(lkb);
4982                         memset(ms_local, 0, sizeof(struct dlm_message));
4983                         ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
4984                         ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
4985                         ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
4986                         _receive_cancel_reply(lkb, ms_local, true);
4987                         dlm_put_lkb(lkb);
4988                         break;
4989
4990                 default:
4991                         log_error(ls, "invalid lkb wait_type %d %d",
4992                                   lkb->lkb_wait_type, wait_type);
4993                 }
4994                 schedule();
4995         }
4996         mutex_unlock(&ls->ls_waiters_mutex);
4997         kfree(ms_local);
4998 }
4999
5000 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5001 {
5002         struct dlm_lkb *lkb = NULL, *iter;
5003
5004         mutex_lock(&ls->ls_waiters_mutex);
5005         list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5006                 if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
5007                         hold_lkb(iter);
5008                         lkb = iter;
5009                         break;
5010                 }
5011         }
5012         mutex_unlock(&ls->ls_waiters_mutex);
5013
5014         return lkb;
5015 }
5016
5017 /*
5018  * Forced state reset for locks that were in the middle of remote operations
5019  * when recovery happened (i.e. lkbs that were on the waiters list, waiting
5020  * for a reply from a remote operation.)  The lkbs remaining on the waiters
5021  * list need to be reevaluated; some may need resending to a different node
5022  * than previously, and some may now need local handling rather than remote.
5023  *
5024  * First, the lkb state for the voided remote operation is forcibly reset,
5025  * equivalent to what remove_from_waiters() would normally do:
5026  * . lkb removed from ls_waiters list
5027  * . lkb wait_type cleared
5028  * . lkb waiters_count cleared
5029  * . lkb ref count decremented for each waiters_count (almost always 1,
5030  *   but possibly 2 in case of cancel/unlock overlapping, which means
5031  *   two remote replies were being expected for the lkb.)
5032  *
5033  * Second, the lkb is reprocessed like an original operation would be,
5034  * by passing it to _request_lock or _convert_lock, which will either
5035  * process the lkb operation locally, or send it to a remote node again
5036  * and put the lkb back onto the waiters list.
5037  *
5038  * When reprocessing the lkb, we may find that it's flagged for an overlapping
5039  * force-unlock or cancel, either from before recovery began, or after recovery
5040  * finished.  If this is the case, the unlock/cancel is done directly, and the
5041  * original operation is not initiated again (no _request_lock/_convert_lock.)
5042  */
5043
5044 int dlm_recover_waiters_post(struct dlm_ls *ls)
5045 {
5046         struct dlm_lkb *lkb;
5047         struct dlm_rsb *r;
5048         int error = 0, mstype, err, oc, ou;
5049
5050         while (1) {
5051                 if (dlm_locking_stopped(ls)) {
5052                         log_debug(ls, "recover_waiters_post aborted");
5053                         error = -EINTR;
5054                         break;
5055                 }
5056
5057                 /* 
5058                  * Find an lkb from the waiters list that's been affected by
5059                  * recovery node changes, and needs to be reprocessed.  Does
5060                  * hold_lkb(), adding a refcount.
5061                  */
5062                 lkb = find_resend_waiter(ls);
5063                 if (!lkb)
5064                         break;
5065
5066                 r = lkb->lkb_resource;
5067                 hold_rsb(r);
5068                 lock_rsb(r);
5069
5070                 /*
5071                  * If the lkb has been flagged for a force unlock or cancel,
5072                  * then the reprocessing below will be replaced by just doing
5073                  * the unlock/cancel directly.
5074                  */
5075                 mstype = lkb->lkb_wait_type;
5076                 oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
5077                                         &lkb->lkb_iflags);
5078                 ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
5079                                         &lkb->lkb_iflags);
5080                 err = 0;
5081
5082                 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5083                           "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5084                           "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5085                           r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5086                           dlm_dir_nodeid(r), oc, ou);
5087
5088                 /*
5089                  * No reply to the pre-recovery operation will now be received,
5090                  * so a forced equivalent of remove_from_waiters() is needed to
5091                  * reset the waiters state that was in place before recovery.
5092                  */
5093
5094                 clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5095
5096                 /* Forcibly clear wait_type */
5097                 lkb->lkb_wait_type = 0;
5098
5099                 /*
5100                  * Forcibly reset wait_count and associated refcount.  The
5101                  * wait_count will almost always be 1, but in case of an
5102                  * overlapping unlock/cancel it could be 2: see where
5103                  * add_to_waiters() finds the lkb is already on the waiters
5104                  * list and does lkb_wait_count++; hold_lkb().
5105                  */
5106                 while (lkb->lkb_wait_count) {
5107                         lkb->lkb_wait_count--;
5108                         unhold_lkb(lkb);
5109                 }
5110
5111                 /* Forcibly remove from waiters list */
5112                 mutex_lock(&ls->ls_waiters_mutex);
5113                 list_del_init(&lkb->lkb_wait_reply);
5114                 mutex_unlock(&ls->ls_waiters_mutex);
5115
5116                 /*
5117                  * The lkb is now clear of all prior waiters state and can be
5118                  * processed locally, or sent to remote node again, or directly
5119                  * cancelled/unlocked.
5120                  */
5121
5122                 if (oc || ou) {
5123                         /* do an unlock or cancel instead of resending */
5124                         switch (mstype) {
5125                         case DLM_MSG_LOOKUP:
5126                         case DLM_MSG_REQUEST:
5127                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5128                                                         -DLM_ECANCEL);
5129                                 unhold_lkb(lkb); /* undoes create_lkb() */
5130                                 break;
5131                         case DLM_MSG_CONVERT:
5132                                 if (oc) {
5133                                         queue_cast(r, lkb, -DLM_ECANCEL);
5134                                 } else {
5135                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5136                                         _unlock_lock(r, lkb);
5137                                 }
5138                                 break;
5139                         default:
5140                                 err = 1;
5141                         }
5142                 } else {
5143                         switch (mstype) {
5144                         case DLM_MSG_LOOKUP:
5145                         case DLM_MSG_REQUEST:
5146                                 _request_lock(r, lkb);
5147                                 if (is_master(r))
5148                                         confirm_master(r, 0);
5149                                 break;
5150                         case DLM_MSG_CONVERT:
5151                                 _convert_lock(r, lkb);
5152                                 break;
5153                         default:
5154                                 err = 1;
5155                         }
5156                 }
5157
5158                 if (err) {
5159                         log_error(ls, "waiter %x msg %d r_nodeid %d "
5160                                   "dir_nodeid %d overlap %d %d",
5161                                   lkb->lkb_id, mstype, r->res_nodeid,
5162                                   dlm_dir_nodeid(r), oc, ou);
5163                 }
5164                 unlock_rsb(r);
5165                 put_rsb(r);
5166                 dlm_put_lkb(lkb);
5167         }
5168
5169         return error;
5170 }
5171
5172 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5173                               struct list_head *list)
5174 {
5175         struct dlm_lkb *lkb, *safe;
5176
5177         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5178                 if (!is_master_copy(lkb))
5179                         continue;
5180
5181                 /* don't purge lkbs we've added in recover_master_copy for
5182                    the current recovery seq */
5183
5184                 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5185                         continue;
5186
5187                 del_lkb(r, lkb);
5188
5189                 /* this put should free the lkb */
5190                 if (!dlm_put_lkb(lkb))
5191                         log_error(ls, "purged mstcpy lkb not released");
5192         }
5193 }
5194
5195 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5196 {
5197         struct dlm_ls *ls = r->res_ls;
5198
5199         purge_mstcpy_list(ls, r, &r->res_grantqueue);
5200         purge_mstcpy_list(ls, r, &r->res_convertqueue);
5201         purge_mstcpy_list(ls, r, &r->res_waitqueue);
5202 }
5203
5204 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5205                             struct list_head *list,
5206                             int nodeid_gone, unsigned int *count)
5207 {
5208         struct dlm_lkb *lkb, *safe;
5209
5210         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5211                 if (!is_master_copy(lkb))
5212                         continue;
5213
5214                 if ((lkb->lkb_nodeid == nodeid_gone) ||
5215                     dlm_is_removed(ls, lkb->lkb_nodeid)) {
5216
5217                         /* tell recover_lvb to invalidate the lvb
5218                            because a node holding EX/PW failed */
5219                         if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5220                             (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5221                                 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5222                         }
5223
5224                         del_lkb(r, lkb);
5225
5226                         /* this put should free the lkb */
5227                         if (!dlm_put_lkb(lkb))
5228                                 log_error(ls, "purged dead lkb not released");
5229
5230                         rsb_set_flag(r, RSB_RECOVER_GRANT);
5231
5232                         (*count)++;
5233                 }
5234         }
5235 }
5236
5237 /* Get rid of locks held by nodes that are gone. */
5238
5239 void dlm_recover_purge(struct dlm_ls *ls)
5240 {
5241         struct dlm_rsb *r;
5242         struct dlm_member *memb;
5243         int nodes_count = 0;
5244         int nodeid_gone = 0;
5245         unsigned int lkb_count = 0;
5246
5247         /* cache one removed nodeid to optimize the common
5248            case of a single node removed */
5249
5250         list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5251                 nodes_count++;
5252                 nodeid_gone = memb->nodeid;
5253         }
5254
5255         if (!nodes_count)
5256                 return;
5257
5258         down_write(&ls->ls_root_sem);
5259         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5260                 hold_rsb(r);
5261                 lock_rsb(r);
5262                 if (is_master(r)) {
5263                         purge_dead_list(ls, r, &r->res_grantqueue,
5264                                         nodeid_gone, &lkb_count);
5265                         purge_dead_list(ls, r, &r->res_convertqueue,
5266                                         nodeid_gone, &lkb_count);
5267                         purge_dead_list(ls, r, &r->res_waitqueue,
5268                                         nodeid_gone, &lkb_count);
5269                 }
5270                 unlock_rsb(r);
5271                 unhold_rsb(r);
5272                 cond_resched();
5273         }
5274         up_write(&ls->ls_root_sem);
5275
5276         if (lkb_count)
5277                 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5278                           lkb_count, nodes_count);
5279 }
5280
5281 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
5282 {
5283         struct rb_node *n;
5284         struct dlm_rsb *r;
5285
5286         spin_lock(&ls->ls_rsbtbl[bucket].lock);
5287         for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5288                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
5289
5290                 if (!rsb_flag(r, RSB_RECOVER_GRANT))
5291                         continue;
5292                 if (!is_master(r)) {
5293                         rsb_clear_flag(r, RSB_RECOVER_GRANT);
5294                         continue;
5295                 }
5296                 hold_rsb(r);
5297                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5298                 return r;
5299         }
5300         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5301         return NULL;
5302 }
5303
5304 /*
5305  * Attempt to grant locks on resources that we are the master of.
5306  * Locks may have become grantable during recovery because locks
5307  * from departed nodes have been purged (or not rebuilt), allowing
5308  * previously blocked locks to now be granted.  The subset of rsb's
5309  * we are interested in are those with lkb's on either the convert or
5310  * waiting queues.
5311  *
5312  * Simplest would be to go through each master rsb and check for non-empty
5313  * convert or waiting queues, and attempt to grant on those rsbs.
5314  * Checking the queues requires lock_rsb, though, for which we'd need
5315  * to release the rsbtbl lock.  This would make iterating through all
5316  * rsb's very inefficient.  So, we rely on earlier recovery routines
5317  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5318  * locks for.
5319  */
5320
5321 void dlm_recover_grant(struct dlm_ls *ls)
5322 {
5323         struct dlm_rsb *r;
5324         int bucket = 0;
5325         unsigned int count = 0;
5326         unsigned int rsb_count = 0;
5327         unsigned int lkb_count = 0;
5328
5329         while (1) {
5330                 r = find_grant_rsb(ls, bucket);
5331                 if (!r) {
5332                         if (bucket == ls->ls_rsbtbl_size - 1)
5333                                 break;
5334                         bucket++;
5335                         continue;
5336                 }
5337                 rsb_count++;
5338                 count = 0;
5339                 lock_rsb(r);
5340                 /* the RECOVER_GRANT flag is checked in the grant path */
5341                 grant_pending_locks(r, &count);
5342                 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5343                 lkb_count += count;
5344                 confirm_master(r, 0);
5345                 unlock_rsb(r);
5346                 put_rsb(r);
5347                 cond_resched();
5348         }
5349
5350         if (lkb_count)
5351                 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5352                           lkb_count, rsb_count);
5353 }
5354
5355 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5356                                          uint32_t remid)
5357 {
5358         struct dlm_lkb *lkb;
5359
5360         list_for_each_entry(lkb, head, lkb_statequeue) {
5361                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5362                         return lkb;
5363         }
5364         return NULL;
5365 }
5366
5367 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5368                                     uint32_t remid)
5369 {
5370         struct dlm_lkb *lkb;
5371
5372         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5373         if (lkb)
5374                 return lkb;
5375         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5376         if (lkb)
5377                 return lkb;
5378         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5379         if (lkb)
5380                 return lkb;
5381         return NULL;
5382 }
5383
5384 /* needs at least dlm_rcom + rcom_lock */
5385 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5386                                   struct dlm_rsb *r, const struct dlm_rcom *rc)
5387 {
5388         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5389
5390         lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5391         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5392         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5393         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5394         dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
5395         set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
5396         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5397         lkb->lkb_rqmode = rl->rl_rqmode;
5398         lkb->lkb_grmode = rl->rl_grmode;
5399         /* don't set lkb_status because add_lkb wants to itself */
5400
5401         lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5402         lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5403
5404         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5405                 int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5406                         sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5407                 if (lvblen > ls->ls_lvblen)
5408                         return -EINVAL;
5409                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5410                 if (!lkb->lkb_lvbptr)
5411                         return -ENOMEM;
5412                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5413         }
5414
5415         /* Conversions between PR and CW (middle modes) need special handling.
5416            The real granted mode of these converting locks cannot be determined
5417            until all locks have been rebuilt on the rsb (recover_conversion) */
5418
5419         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5420             middle_conversion(lkb)) {
5421                 rl->rl_status = DLM_LKSTS_CONVERT;
5422                 lkb->lkb_grmode = DLM_LOCK_IV;
5423                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
5424         }
5425
5426         return 0;
5427 }
5428
5429 /* This lkb may have been recovered in a previous aborted recovery so we need
5430    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5431    If so we just send back a standard reply.  If not, we create a new lkb with
5432    the given values and send back our lkid.  We send back our lkid by sending
5433    back the rcom_lock struct we got but with the remid field filled in. */
5434
5435 /* needs at least dlm_rcom + rcom_lock */
5436 int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5437                             __le32 *rl_remid, __le32 *rl_result)
5438 {
5439         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5440         struct dlm_rsb *r;
5441         struct dlm_lkb *lkb;
5442         uint32_t remid = 0;
5443         int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5444         int error;
5445
5446         /* init rl_remid with rcom lock rl_remid */
5447         *rl_remid = rl->rl_remid;
5448
5449         if (rl->rl_parent_lkid) {
5450                 error = -EOPNOTSUPP;
5451                 goto out;
5452         }
5453
5454         remid = le32_to_cpu(rl->rl_lkid);
5455
5456         /* In general we expect the rsb returned to be R_MASTER, but we don't
5457            have to require it.  Recovery of masters on one node can overlap
5458            recovery of locks on another node, so one node can send us MSTCPY
5459            locks before we've made ourselves master of this rsb.  We can still
5460            add new MSTCPY locks that we receive here without any harm; when
5461            we make ourselves master, dlm_recover_masters() won't touch the
5462            MSTCPY locks we've received early. */
5463
5464         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5465                          from_nodeid, R_RECEIVE_RECOVER, &r);
5466         if (error)
5467                 goto out;
5468
5469         lock_rsb(r);
5470
5471         if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5472                 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5473                           from_nodeid, remid);
5474                 error = -EBADR;
5475                 goto out_unlock;
5476         }
5477
5478         lkb = search_remid(r, from_nodeid, remid);
5479         if (lkb) {
5480                 error = -EEXIST;
5481                 goto out_remid;
5482         }
5483
5484         error = create_lkb(ls, &lkb);
5485         if (error)
5486                 goto out_unlock;
5487
5488         error = receive_rcom_lock_args(ls, lkb, r, rc);
5489         if (error) {
5490                 __put_lkb(ls, lkb);
5491                 goto out_unlock;
5492         }
5493
5494         attach_lkb(r, lkb);
5495         add_lkb(r, lkb, rl->rl_status);
5496         ls->ls_recover_locks_in++;
5497
5498         if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5499                 rsb_set_flag(r, RSB_RECOVER_GRANT);
5500
5501  out_remid:
5502         /* this is the new value returned to the lock holder for
5503            saving in its process-copy lkb */
5504         *rl_remid = cpu_to_le32(lkb->lkb_id);
5505
5506         lkb->lkb_recover_seq = ls->ls_recover_seq;
5507
5508  out_unlock:
5509         unlock_rsb(r);
5510         put_rsb(r);
5511  out:
5512         if (error && error != -EEXIST)
5513                 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5514                           from_nodeid, remid, error);
5515         *rl_result = cpu_to_le32(error);
5516         return error;
5517 }
5518
5519 /* needs at least dlm_rcom + rcom_lock */
5520 int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5521                              uint64_t seq)
5522 {
5523         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5524         struct dlm_rsb *r;
5525         struct dlm_lkb *lkb;
5526         uint32_t lkid, remid;
5527         int error, result;
5528
5529         lkid = le32_to_cpu(rl->rl_lkid);
5530         remid = le32_to_cpu(rl->rl_remid);
5531         result = le32_to_cpu(rl->rl_result);
5532
5533         error = find_lkb(ls, lkid, &lkb);
5534         if (error) {
5535                 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5536                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5537                           result);
5538                 return error;
5539         }
5540
5541         r = lkb->lkb_resource;
5542         hold_rsb(r);
5543         lock_rsb(r);
5544
5545         if (!is_process_copy(lkb)) {
5546                 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5547                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5548                           result);
5549                 dlm_dump_rsb(r);
5550                 unlock_rsb(r);
5551                 put_rsb(r);
5552                 dlm_put_lkb(lkb);
5553                 return -EINVAL;
5554         }
5555
5556         switch (result) {
5557         case -EBADR:
5558                 /* There's a chance the new master received our lock before
5559                    dlm_recover_master_reply(), this wouldn't happen if we did
5560                    a barrier between recover_masters and recover_locks. */
5561
5562                 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5563                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5564                           result);
5565         
5566                 dlm_send_rcom_lock(r, lkb, seq);
5567                 goto out;
5568         case -EEXIST:
5569         case 0:
5570                 lkb->lkb_remid = remid;
5571                 break;
5572         default:
5573                 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5574                           lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5575                           result);
5576         }
5577
5578         /* an ack for dlm_recover_locks() which waits for replies from
5579            all the locks it sends to new masters */
5580         dlm_recovered_lock(r);
5581  out:
5582         unlock_rsb(r);
5583         put_rsb(r);
5584         dlm_put_lkb(lkb);
5585
5586         return 0;
5587 }
5588
5589 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5590                      int mode, uint32_t flags, void *name, unsigned int namelen)
5591 {
5592         struct dlm_lkb *lkb;
5593         struct dlm_args args;
5594         bool do_put = true;
5595         int error;
5596
5597         dlm_lock_recovery(ls);
5598
5599         error = create_lkb(ls, &lkb);
5600         if (error) {
5601                 kfree(ua);
5602                 goto out;
5603         }
5604
5605         trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5606
5607         if (flags & DLM_LKF_VALBLK) {
5608                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5609                 if (!ua->lksb.sb_lvbptr) {
5610                         kfree(ua);
5611                         error = -ENOMEM;
5612                         goto out_put;
5613                 }
5614         }
5615         error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5616                               fake_bastfn, &args);
5617         if (error) {
5618                 kfree(ua->lksb.sb_lvbptr);
5619                 ua->lksb.sb_lvbptr = NULL;
5620                 kfree(ua);
5621                 goto out_put;
5622         }
5623
5624         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5625            When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace
5626            lock and that lkb_astparam is the dlm_user_args structure. */
5627         set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
5628         error = request_lock(ls, lkb, name, namelen, &args);
5629
5630         switch (error) {
5631         case 0:
5632                 break;
5633         case -EINPROGRESS:
5634                 error = 0;
5635                 break;
5636         case -EAGAIN:
5637                 error = 0;
5638                 fallthrough;
5639         default:
5640                 goto out_put;
5641         }
5642
5643         /* add this new lkb to the per-process list of locks */
5644         spin_lock(&ua->proc->locks_spin);
5645         hold_lkb(lkb);
5646         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5647         spin_unlock(&ua->proc->locks_spin);
5648         do_put = false;
5649  out_put:
5650         trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5651         if (do_put)
5652                 __put_lkb(ls, lkb);
5653  out:
5654         dlm_unlock_recovery(ls);
5655         return error;
5656 }
5657
5658 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5659                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5660 {
5661         struct dlm_lkb *lkb;
5662         struct dlm_args args;
5663         struct dlm_user_args *ua;
5664         int error;
5665
5666         dlm_lock_recovery(ls);
5667
5668         error = find_lkb(ls, lkid, &lkb);
5669         if (error)
5670                 goto out;
5671
5672         trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5673
5674         /* user can change the params on its lock when it converts it, or
5675            add an lvb that didn't exist before */
5676
5677         ua = lkb->lkb_ua;
5678
5679         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5680                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5681                 if (!ua->lksb.sb_lvbptr) {
5682                         error = -ENOMEM;
5683                         goto out_put;
5684                 }
5685         }
5686         if (lvb_in && ua->lksb.sb_lvbptr)
5687                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5688
5689         ua->xid = ua_tmp->xid;
5690         ua->castparam = ua_tmp->castparam;
5691         ua->castaddr = ua_tmp->castaddr;
5692         ua->bastparam = ua_tmp->bastparam;
5693         ua->bastaddr = ua_tmp->bastaddr;
5694         ua->user_lksb = ua_tmp->user_lksb;
5695
5696         error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5697                               fake_bastfn, &args);
5698         if (error)
5699                 goto out_put;
5700
5701         error = convert_lock(ls, lkb, &args);
5702
5703         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5704                 error = 0;
5705  out_put:
5706         trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5707         dlm_put_lkb(lkb);
5708  out:
5709         dlm_unlock_recovery(ls);
5710         kfree(ua_tmp);
5711         return error;
5712 }
5713
5714 /*
5715  * The caller asks for an orphan lock on a given resource with a given mode.
5716  * If a matching lock exists, it's moved to the owner's list of locks and
5717  * the lkid is returned.
5718  */
5719
5720 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5721                      int mode, uint32_t flags, void *name, unsigned int namelen,
5722                      uint32_t *lkid)
5723 {
5724         struct dlm_lkb *lkb = NULL, *iter;
5725         struct dlm_user_args *ua;
5726         int found_other_mode = 0;
5727         int rv = 0;
5728
5729         mutex_lock(&ls->ls_orphans_mutex);
5730         list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5731                 if (iter->lkb_resource->res_length != namelen)
5732                         continue;
5733                 if (memcmp(iter->lkb_resource->res_name, name, namelen))
5734                         continue;
5735                 if (iter->lkb_grmode != mode) {
5736                         found_other_mode = 1;
5737                         continue;
5738                 }
5739
5740                 lkb = iter;
5741                 list_del_init(&iter->lkb_ownqueue);
5742                 clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags);
5743                 *lkid = iter->lkb_id;
5744                 break;
5745         }
5746         mutex_unlock(&ls->ls_orphans_mutex);
5747
5748         if (!lkb && found_other_mode) {
5749                 rv = -EAGAIN;
5750                 goto out;
5751         }
5752
5753         if (!lkb) {
5754                 rv = -ENOENT;
5755                 goto out;
5756         }
5757
5758         lkb->lkb_exflags = flags;
5759         lkb->lkb_ownpid = (int) current->pid;
5760
5761         ua = lkb->lkb_ua;
5762
5763         ua->proc = ua_tmp->proc;
5764         ua->xid = ua_tmp->xid;
5765         ua->castparam = ua_tmp->castparam;
5766         ua->castaddr = ua_tmp->castaddr;
5767         ua->bastparam = ua_tmp->bastparam;
5768         ua->bastaddr = ua_tmp->bastaddr;
5769         ua->user_lksb = ua_tmp->user_lksb;
5770
5771         /*
5772          * The lkb reference from the ls_orphans list was not
5773          * removed above, and is now considered the reference
5774          * for the proc locks list.
5775          */
5776
5777         spin_lock(&ua->proc->locks_spin);
5778         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5779         spin_unlock(&ua->proc->locks_spin);
5780  out:
5781         kfree(ua_tmp);
5782         return rv;
5783 }
5784
5785 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5786                     uint32_t flags, uint32_t lkid, char *lvb_in)
5787 {
5788         struct dlm_lkb *lkb;
5789         struct dlm_args args;
5790         struct dlm_user_args *ua;
5791         int error;
5792
5793         dlm_lock_recovery(ls);
5794
5795         error = find_lkb(ls, lkid, &lkb);
5796         if (error)
5797                 goto out;
5798
5799         trace_dlm_unlock_start(ls, lkb, flags);
5800
5801         ua = lkb->lkb_ua;
5802
5803         if (lvb_in && ua->lksb.sb_lvbptr)
5804                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5805         if (ua_tmp->castparam)
5806                 ua->castparam = ua_tmp->castparam;
5807         ua->user_lksb = ua_tmp->user_lksb;
5808
5809         error = set_unlock_args(flags, ua, &args);
5810         if (error)
5811                 goto out_put;
5812
5813         error = unlock_lock(ls, lkb, &args);
5814
5815         if (error == -DLM_EUNLOCK)
5816                 error = 0;
5817         /* from validate_unlock_args() */
5818         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5819                 error = 0;
5820         if (error)
5821                 goto out_put;
5822
5823         spin_lock(&ua->proc->locks_spin);
5824         /* dlm_user_add_cb() may have already taken lkb off the proc list */
5825         if (!list_empty(&lkb->lkb_ownqueue))
5826                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5827         spin_unlock(&ua->proc->locks_spin);
5828  out_put:
5829         trace_dlm_unlock_end(ls, lkb, flags, error);
5830         dlm_put_lkb(lkb);
5831  out:
5832         dlm_unlock_recovery(ls);
5833         kfree(ua_tmp);
5834         return error;
5835 }
5836
5837 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5838                     uint32_t flags, uint32_t lkid)
5839 {
5840         struct dlm_lkb *lkb;
5841         struct dlm_args args;
5842         struct dlm_user_args *ua;
5843         int error;
5844
5845         dlm_lock_recovery(ls);
5846
5847         error = find_lkb(ls, lkid, &lkb);
5848         if (error)
5849                 goto out;
5850
5851         trace_dlm_unlock_start(ls, lkb, flags);
5852
5853         ua = lkb->lkb_ua;
5854         if (ua_tmp->castparam)
5855                 ua->castparam = ua_tmp->castparam;
5856         ua->user_lksb = ua_tmp->user_lksb;
5857
5858         error = set_unlock_args(flags, ua, &args);
5859         if (error)
5860                 goto out_put;
5861
5862         error = cancel_lock(ls, lkb, &args);
5863
5864         if (error == -DLM_ECANCEL)
5865                 error = 0;
5866         /* from validate_unlock_args() */
5867         if (error == -EBUSY)
5868                 error = 0;
5869  out_put:
5870         trace_dlm_unlock_end(ls, lkb, flags, error);
5871         dlm_put_lkb(lkb);
5872  out:
5873         dlm_unlock_recovery(ls);
5874         kfree(ua_tmp);
5875         return error;
5876 }
5877
5878 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
5879 {
5880         struct dlm_lkb *lkb;
5881         struct dlm_args args;
5882         struct dlm_user_args *ua;
5883         struct dlm_rsb *r;
5884         int error;
5885
5886         dlm_lock_recovery(ls);
5887
5888         error = find_lkb(ls, lkid, &lkb);
5889         if (error)
5890                 goto out;
5891
5892         trace_dlm_unlock_start(ls, lkb, flags);
5893
5894         ua = lkb->lkb_ua;
5895
5896         error = set_unlock_args(flags, ua, &args);
5897         if (error)
5898                 goto out_put;
5899
5900         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
5901
5902         r = lkb->lkb_resource;
5903         hold_rsb(r);
5904         lock_rsb(r);
5905
5906         error = validate_unlock_args(lkb, &args);
5907         if (error)
5908                 goto out_r;
5909         set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
5910
5911         error = _cancel_lock(r, lkb);
5912  out_r:
5913         unlock_rsb(r);
5914         put_rsb(r);
5915
5916         if (error == -DLM_ECANCEL)
5917                 error = 0;
5918         /* from validate_unlock_args() */
5919         if (error == -EBUSY)
5920                 error = 0;
5921  out_put:
5922         trace_dlm_unlock_end(ls, lkb, flags, error);
5923         dlm_put_lkb(lkb);
5924  out:
5925         dlm_unlock_recovery(ls);
5926         return error;
5927 }
5928
5929 /* lkb's that are removed from the waiters list by revert are just left on the
5930    orphans list with the granted orphan locks, to be freed by purge */
5931
5932 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5933 {
5934         struct dlm_args args;
5935         int error;
5936
5937         hold_lkb(lkb); /* reference for the ls_orphans list */
5938         mutex_lock(&ls->ls_orphans_mutex);
5939         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
5940         mutex_unlock(&ls->ls_orphans_mutex);
5941
5942         set_unlock_args(0, lkb->lkb_ua, &args);
5943
5944         error = cancel_lock(ls, lkb, &args);
5945         if (error == -DLM_ECANCEL)
5946                 error = 0;
5947         return error;
5948 }
5949
5950 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
5951    granted.  Regardless of what rsb queue the lock is on, it's removed and
5952    freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
5953    if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
5954
5955 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5956 {
5957         struct dlm_args args;
5958         int error;
5959
5960         set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
5961                         lkb->lkb_ua, &args);
5962
5963         error = unlock_lock(ls, lkb, &args);
5964         if (error == -DLM_EUNLOCK)
5965                 error = 0;
5966         return error;
5967 }
5968
5969 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
5970    (which does lock_rsb) due to deadlock with receiving a message that does
5971    lock_rsb followed by dlm_user_add_cb() */
5972
5973 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
5974                                      struct dlm_user_proc *proc)
5975 {
5976         struct dlm_lkb *lkb = NULL;
5977
5978         spin_lock(&ls->ls_clear_proc_locks);
5979         if (list_empty(&proc->locks))
5980                 goto out;
5981
5982         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
5983         list_del_init(&lkb->lkb_ownqueue);
5984
5985         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5986                 set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
5987         else
5988                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
5989  out:
5990         spin_unlock(&ls->ls_clear_proc_locks);
5991         return lkb;
5992 }
5993
5994 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
5995    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
5996    which we clear here. */
5997
5998 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
5999    list, and no more device_writes should add lkb's to proc->locks list; so we
6000    shouldn't need to take asts_spin or locks_spin here.  this assumes that
6001    device reads/writes/closes are serialized -- FIXME: we may need to serialize
6002    them ourself. */
6003
6004 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6005 {
6006         struct dlm_lkb *lkb, *safe;
6007
6008         dlm_lock_recovery(ls);
6009
6010         while (1) {
6011                 lkb = del_proc_lock(ls, proc);
6012                 if (!lkb)
6013                         break;
6014                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6015                         orphan_proc_lock(ls, lkb);
6016                 else
6017                         unlock_proc_lock(ls, lkb);
6018
6019                 /* this removes the reference for the proc->locks list
6020                    added by dlm_user_request, it may result in the lkb
6021                    being freed */
6022
6023                 dlm_put_lkb(lkb);
6024         }
6025
6026         spin_lock(&ls->ls_clear_proc_locks);
6027
6028         /* in-progress unlocks */
6029         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6030                 list_del_init(&lkb->lkb_ownqueue);
6031                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6032                 dlm_put_lkb(lkb);
6033         }
6034
6035         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6036                 dlm_purge_lkb_callbacks(lkb);
6037                 list_del_init(&lkb->lkb_cb_list);
6038                 dlm_put_lkb(lkb);
6039         }
6040
6041         spin_unlock(&ls->ls_clear_proc_locks);
6042         dlm_unlock_recovery(ls);
6043 }
6044
6045 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6046 {
6047         struct dlm_lkb *lkb, *safe;
6048
6049         while (1) {
6050                 lkb = NULL;
6051                 spin_lock(&proc->locks_spin);
6052                 if (!list_empty(&proc->locks)) {
6053                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
6054                                          lkb_ownqueue);
6055                         list_del_init(&lkb->lkb_ownqueue);
6056                 }
6057                 spin_unlock(&proc->locks_spin);
6058
6059                 if (!lkb)
6060                         break;
6061
6062                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6063                 unlock_proc_lock(ls, lkb);
6064                 dlm_put_lkb(lkb); /* ref from proc->locks list */
6065         }
6066
6067         spin_lock(&proc->locks_spin);
6068         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6069                 list_del_init(&lkb->lkb_ownqueue);
6070                 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6071                 dlm_put_lkb(lkb);
6072         }
6073         spin_unlock(&proc->locks_spin);
6074
6075         spin_lock(&proc->asts_spin);
6076         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6077                 dlm_purge_lkb_callbacks(lkb);
6078                 list_del_init(&lkb->lkb_cb_list);
6079                 dlm_put_lkb(lkb);
6080         }
6081         spin_unlock(&proc->asts_spin);
6082 }
6083
6084 /* pid of 0 means purge all orphans */
6085
6086 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6087 {
6088         struct dlm_lkb *lkb, *safe;
6089
6090         mutex_lock(&ls->ls_orphans_mutex);
6091         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6092                 if (pid && lkb->lkb_ownpid != pid)
6093                         continue;
6094                 unlock_proc_lock(ls, lkb);
6095                 list_del_init(&lkb->lkb_ownqueue);
6096                 dlm_put_lkb(lkb);
6097         }
6098         mutex_unlock(&ls->ls_orphans_mutex);
6099 }
6100
6101 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6102 {
6103         struct dlm_message *ms;
6104         struct dlm_mhandle *mh;
6105         int error;
6106
6107         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6108                                 DLM_MSG_PURGE, &ms, &mh, GFP_NOFS);
6109         if (error)
6110                 return error;
6111         ms->m_nodeid = cpu_to_le32(nodeid);
6112         ms->m_pid = cpu_to_le32(pid);
6113
6114         return send_message(mh, ms, NULL, 0);
6115 }
6116
6117 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6118                    int nodeid, int pid)
6119 {
6120         int error = 0;
6121
6122         if (nodeid && (nodeid != dlm_our_nodeid())) {
6123                 error = send_purge(ls, nodeid, pid);
6124         } else {
6125                 dlm_lock_recovery(ls);
6126                 if (pid == current->pid)
6127                         purge_proc_locks(ls, proc);
6128                 else
6129                         do_purge(ls, nodeid, pid);
6130                 dlm_unlock_recovery(ls);
6131         }
6132         return error;
6133 }
6134
6135 /* debug functionality */
6136 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6137                       int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
6138 {
6139         struct dlm_lksb *lksb;
6140         struct dlm_lkb *lkb;
6141         struct dlm_rsb *r;
6142         int error;
6143
6144         /* we currently can't set a valid user lock */
6145         if (lkb_dflags & BIT(DLM_DFL_USER_BIT))
6146                 return -EOPNOTSUPP;
6147
6148         lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6149         if (!lksb)
6150                 return -ENOMEM;
6151
6152         error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6153         if (error) {
6154                 kfree(lksb);
6155                 return error;
6156         }
6157
6158         dlm_set_dflags_val(lkb, lkb_dflags);
6159         lkb->lkb_nodeid = lkb_nodeid;
6160         lkb->lkb_lksb = lksb;
6161         /* user specific pointer, just don't have it NULL for kernel locks */
6162         if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
6163                 lkb->lkb_astparam = (void *)0xDEADBEEF;
6164
6165         error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6166         if (error) {
6167                 kfree(lksb);
6168                 __put_lkb(ls, lkb);
6169                 return error;
6170         }
6171
6172         lock_rsb(r);
6173         attach_lkb(r, lkb);
6174         add_lkb(r, lkb, lkb_status);
6175         unlock_rsb(r);
6176         put_rsb(r);
6177
6178         return 0;
6179 }
6180
6181 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6182                                  int mstype, int to_nodeid)
6183 {
6184         struct dlm_lkb *lkb;
6185         int error;
6186
6187         error = find_lkb(ls, lkb_id, &lkb);
6188         if (error)
6189                 return error;
6190
6191         error = add_to_waiters(lkb, mstype, to_nodeid);
6192         dlm_put_lkb(lkb);
6193         return error;
6194 }
6195