GNU Linux-libre 4.14.332-gnu1
[releases.git] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "lowcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89                                     struct dlm_message *ms);
90 static int receive_extralen(struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void del_timeout(struct dlm_lkb *lkb);
93 static void toss_rsb(struct kref *kref);
94
95 /*
96  * Lock compatibilty matrix - thanks Steve
97  * UN = Unlocked state. Not really a state, used as a flag
98  * PD = Padding. Used to make the matrix a nice power of two in size
99  * Other states are the same as the VMS DLM.
100  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
101  */
102
103 static const int __dlm_compat_matrix[8][8] = {
104       /* UN NL CR CW PR PW EX PD */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
106         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
107         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
108         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
109         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
110         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
111         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
112         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
113 };
114
115 /*
116  * This defines the direction of transfer of LVB data.
117  * Granted mode is the row; requested mode is the column.
118  * Usage: matrix[grmode+1][rqmode+1]
119  * 1 = LVB is returned to the caller
120  * 0 = LVB is written to the resource
121  * -1 = nothing happens to the LVB
122  */
123
124 const int dlm_lvb_operations[8][8] = {
125         /* UN   NL  CR  CW  PR  PW  EX  PD*/
126         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
127         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
128         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
129         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
130         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
131         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
133         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
134 };
135
136 #define modes_compat(gr, rq) \
137         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
138
139 int dlm_modes_compat(int mode1, int mode2)
140 {
141         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
142 }
143
144 /*
145  * Compatibility matrix for conversions with QUECVT set.
146  * Granted mode is the row; requested mode is the column.
147  * Usage: matrix[grmode+1][rqmode+1]
148  */
149
150 static const int __quecvt_compat_matrix[8][8] = {
151       /* UN NL CR CW PR PW EX PD */
152         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
153         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
154         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
155         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
156         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
157         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
158         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
159         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
160 };
161
162 void dlm_print_lkb(struct dlm_lkb *lkb)
163 {
164         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
165                "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
166                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
167                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
168                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
169                (unsigned long long)lkb->lkb_recover_seq);
170 }
171
172 static void dlm_print_rsb(struct dlm_rsb *r)
173 {
174         printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
175                "rlc %d name %s\n",
176                r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
177                r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
178                r->res_name);
179 }
180
181 void dlm_dump_rsb(struct dlm_rsb *r)
182 {
183         struct dlm_lkb *lkb;
184
185         dlm_print_rsb(r);
186
187         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
188                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
189         printk(KERN_ERR "rsb lookup list\n");
190         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
191                 dlm_print_lkb(lkb);
192         printk(KERN_ERR "rsb grant queue:\n");
193         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
194                 dlm_print_lkb(lkb);
195         printk(KERN_ERR "rsb convert queue:\n");
196         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
197                 dlm_print_lkb(lkb);
198         printk(KERN_ERR "rsb wait queue:\n");
199         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
200                 dlm_print_lkb(lkb);
201 }
202
203 /* Threads cannot use the lockspace while it's being recovered */
204
205 static inline void dlm_lock_recovery(struct dlm_ls *ls)
206 {
207         down_read(&ls->ls_in_recovery);
208 }
209
210 void dlm_unlock_recovery(struct dlm_ls *ls)
211 {
212         up_read(&ls->ls_in_recovery);
213 }
214
215 int dlm_lock_recovery_try(struct dlm_ls *ls)
216 {
217         return down_read_trylock(&ls->ls_in_recovery);
218 }
219
220 static inline int can_be_queued(struct dlm_lkb *lkb)
221 {
222         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
223 }
224
225 static inline int force_blocking_asts(struct dlm_lkb *lkb)
226 {
227         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
228 }
229
230 static inline int is_demoted(struct dlm_lkb *lkb)
231 {
232         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
233 }
234
235 static inline int is_altmode(struct dlm_lkb *lkb)
236 {
237         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
238 }
239
240 static inline int is_granted(struct dlm_lkb *lkb)
241 {
242         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
243 }
244
245 static inline int is_remote(struct dlm_rsb *r)
246 {
247         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
248         return !!r->res_nodeid;
249 }
250
251 static inline int is_process_copy(struct dlm_lkb *lkb)
252 {
253         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
254 }
255
256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
259 }
260
261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265                 return 1;
266         return 0;
267 }
268
269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
277 }
278
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
282 }
283
284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
287                                   DLM_IFL_OVERLAP_CANCEL));
288 }
289
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292         if (is_master_copy(lkb))
293                 return;
294
295         del_timeout(lkb);
296
297         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
298
299         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
300            timeout caused the cancel then return -ETIMEDOUT */
301         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
302                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
303                 rv = -ETIMEDOUT;
304         }
305
306         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
307                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
308                 rv = -EDEADLK;
309         }
310
311         dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
312 }
313
314 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
315 {
316         queue_cast(r, lkb,
317                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
318 }
319
320 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
321 {
322         if (is_master_copy(lkb)) {
323                 send_bast(r, lkb, rqmode);
324         } else {
325                 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
326         }
327 }
328
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332
333 /* This is only called to add a reference when the code already holds
334    a valid reference to the rsb, so there's no need for locking. */
335
336 static inline void hold_rsb(struct dlm_rsb *r)
337 {
338         kref_get(&r->res_ref);
339 }
340
341 void dlm_hold_rsb(struct dlm_rsb *r)
342 {
343         hold_rsb(r);
344 }
345
346 /* When all references to the rsb are gone it's transferred to
347    the tossed list for later disposal. */
348
349 static void put_rsb(struct dlm_rsb *r)
350 {
351         struct dlm_ls *ls = r->res_ls;
352         uint32_t bucket = r->res_bucket;
353
354         spin_lock(&ls->ls_rsbtbl[bucket].lock);
355         kref_put(&r->res_ref, toss_rsb);
356         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
357 }
358
359 void dlm_put_rsb(struct dlm_rsb *r)
360 {
361         put_rsb(r);
362 }
363
364 static int pre_rsb_struct(struct dlm_ls *ls)
365 {
366         struct dlm_rsb *r1, *r2;
367         int count = 0;
368
369         spin_lock(&ls->ls_new_rsb_spin);
370         if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
371                 spin_unlock(&ls->ls_new_rsb_spin);
372                 return 0;
373         }
374         spin_unlock(&ls->ls_new_rsb_spin);
375
376         r1 = dlm_allocate_rsb(ls);
377         r2 = dlm_allocate_rsb(ls);
378
379         spin_lock(&ls->ls_new_rsb_spin);
380         if (r1) {
381                 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
382                 ls->ls_new_rsb_count++;
383         }
384         if (r2) {
385                 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
386                 ls->ls_new_rsb_count++;
387         }
388         count = ls->ls_new_rsb_count;
389         spin_unlock(&ls->ls_new_rsb_spin);
390
391         if (!count)
392                 return -ENOMEM;
393         return 0;
394 }
395
396 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
397    unlock any spinlocks, go back and call pre_rsb_struct again.
398    Otherwise, take an rsb off the list and return it. */
399
400 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
401                           struct dlm_rsb **r_ret)
402 {
403         struct dlm_rsb *r;
404         int count;
405
406         spin_lock(&ls->ls_new_rsb_spin);
407         if (list_empty(&ls->ls_new_rsb)) {
408                 count = ls->ls_new_rsb_count;
409                 spin_unlock(&ls->ls_new_rsb_spin);
410                 log_debug(ls, "find_rsb retry %d %d %s",
411                           count, dlm_config.ci_new_rsb_count, name);
412                 return -EAGAIN;
413         }
414
415         r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
416         list_del(&r->res_hashchain);
417         /* Convert the empty list_head to a NULL rb_node for tree usage: */
418         memset(&r->res_hashnode, 0, sizeof(struct rb_node));
419         ls->ls_new_rsb_count--;
420         spin_unlock(&ls->ls_new_rsb_spin);
421
422         r->res_ls = ls;
423         r->res_length = len;
424         memcpy(r->res_name, name, len);
425         mutex_init(&r->res_mutex);
426
427         INIT_LIST_HEAD(&r->res_lookup);
428         INIT_LIST_HEAD(&r->res_grantqueue);
429         INIT_LIST_HEAD(&r->res_convertqueue);
430         INIT_LIST_HEAD(&r->res_waitqueue);
431         INIT_LIST_HEAD(&r->res_root_list);
432         INIT_LIST_HEAD(&r->res_recover_list);
433
434         *r_ret = r;
435         return 0;
436 }
437
438 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
439 {
440         char maxname[DLM_RESNAME_MAXLEN];
441
442         memset(maxname, 0, DLM_RESNAME_MAXLEN);
443         memcpy(maxname, name, nlen);
444         return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
445 }
446
447 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
448                         struct dlm_rsb **r_ret)
449 {
450         struct rb_node *node = tree->rb_node;
451         struct dlm_rsb *r;
452         int rc;
453
454         while (node) {
455                 r = rb_entry(node, struct dlm_rsb, res_hashnode);
456                 rc = rsb_cmp(r, name, len);
457                 if (rc < 0)
458                         node = node->rb_left;
459                 else if (rc > 0)
460                         node = node->rb_right;
461                 else
462                         goto found;
463         }
464         *r_ret = NULL;
465         return -EBADR;
466
467  found:
468         *r_ret = r;
469         return 0;
470 }
471
472 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
473 {
474         struct rb_node **newn = &tree->rb_node;
475         struct rb_node *parent = NULL;
476         int rc;
477
478         while (*newn) {
479                 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
480                                                res_hashnode);
481
482                 parent = *newn;
483                 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
484                 if (rc < 0)
485                         newn = &parent->rb_left;
486                 else if (rc > 0)
487                         newn = &parent->rb_right;
488                 else {
489                         log_print("rsb_insert match");
490                         dlm_dump_rsb(rsb);
491                         dlm_dump_rsb(cur);
492                         return -EEXIST;
493                 }
494         }
495
496         rb_link_node(&rsb->res_hashnode, parent, newn);
497         rb_insert_color(&rsb->res_hashnode, tree);
498         return 0;
499 }
500
501 /*
502  * Find rsb in rsbtbl and potentially create/add one
503  *
504  * Delaying the release of rsb's has a similar benefit to applications keeping
505  * NL locks on an rsb, but without the guarantee that the cached master value
506  * will still be valid when the rsb is reused.  Apps aren't always smart enough
507  * to keep NL locks on an rsb that they may lock again shortly; this can lead
508  * to excessive master lookups and removals if we don't delay the release.
509  *
510  * Searching for an rsb means looking through both the normal list and toss
511  * list.  When found on the toss list the rsb is moved to the normal list with
512  * ref count of 1; when found on normal list the ref count is incremented.
513  *
514  * rsb's on the keep list are being used locally and refcounted.
515  * rsb's on the toss list are not being used locally, and are not refcounted.
516  *
517  * The toss list rsb's were either
518  * - previously used locally but not any more (were on keep list, then
519  *   moved to toss list when last refcount dropped)
520  * - created and put on toss list as a directory record for a lookup
521  *   (we are the dir node for the res, but are not using the res right now,
522  *   but some other node is)
523  *
524  * The purpose of find_rsb() is to return a refcounted rsb for local use.
525  * So, if the given rsb is on the toss list, it is moved to the keep list
526  * before being returned.
527  *
528  * toss_rsb() happens when all local usage of the rsb is done, i.e. no
529  * more refcounts exist, so the rsb is moved from the keep list to the
530  * toss list.
531  *
532  * rsb's on both keep and toss lists are used for doing a name to master
533  * lookups.  rsb's that are in use locally (and being refcounted) are on
534  * the keep list, rsb's that are not in use locally (not refcounted) and
535  * only exist for name/master lookups are on the toss list.
536  *
537  * rsb's on the toss list who's dir_nodeid is not local can have stale
538  * name/master mappings.  So, remote requests on such rsb's can potentially
539  * return with an error, which means the mapping is stale and needs to
540  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
541  * first_lkid is to keep only a single outstanding request on an rsb
542  * while that rsb has a potentially stale master.)
543  */
544
545 static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
546                         uint32_t hash, uint32_t b,
547                         int dir_nodeid, int from_nodeid,
548                         unsigned int flags, struct dlm_rsb **r_ret)
549 {
550         struct dlm_rsb *r = NULL;
551         int our_nodeid = dlm_our_nodeid();
552         int from_local = 0;
553         int from_other = 0;
554         int from_dir = 0;
555         int create = 0;
556         int error;
557
558         if (flags & R_RECEIVE_REQUEST) {
559                 if (from_nodeid == dir_nodeid)
560                         from_dir = 1;
561                 else
562                         from_other = 1;
563         } else if (flags & R_REQUEST) {
564                 from_local = 1;
565         }
566
567         /*
568          * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
569          * from_nodeid has sent us a lock in dlm_recover_locks, believing
570          * we're the new master.  Our local recovery may not have set
571          * res_master_nodeid to our_nodeid yet, so allow either.  Don't
572          * create the rsb; dlm_recover_process_copy() will handle EBADR
573          * by resending.
574          *
575          * If someone sends us a request, we are the dir node, and we do
576          * not find the rsb anywhere, then recreate it.  This happens if
577          * someone sends us a request after we have removed/freed an rsb
578          * from our toss list.  (They sent a request instead of lookup
579          * because they are using an rsb from their toss list.)
580          */
581
582         if (from_local || from_dir ||
583             (from_other && (dir_nodeid == our_nodeid))) {
584                 create = 1;
585         }
586
587  retry:
588         if (create) {
589                 error = pre_rsb_struct(ls);
590                 if (error < 0)
591                         goto out;
592         }
593
594         spin_lock(&ls->ls_rsbtbl[b].lock);
595
596         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
597         if (error)
598                 goto do_toss;
599         
600         /*
601          * rsb is active, so we can't check master_nodeid without lock_rsb.
602          */
603
604         kref_get(&r->res_ref);
605         error = 0;
606         goto out_unlock;
607
608
609  do_toss:
610         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
611         if (error)
612                 goto do_new;
613
614         /*
615          * rsb found inactive (master_nodeid may be out of date unless
616          * we are the dir_nodeid or were the master)  No other thread
617          * is using this rsb because it's on the toss list, so we can
618          * look at or update res_master_nodeid without lock_rsb.
619          */
620
621         if ((r->res_master_nodeid != our_nodeid) && from_other) {
622                 /* our rsb was not master, and another node (not the dir node)
623                    has sent us a request */
624                 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
625                           from_nodeid, r->res_master_nodeid, dir_nodeid,
626                           r->res_name);
627                 error = -ENOTBLK;
628                 goto out_unlock;
629         }
630
631         if ((r->res_master_nodeid != our_nodeid) && from_dir) {
632                 /* don't think this should ever happen */
633                 log_error(ls, "find_rsb toss from_dir %d master %d",
634                           from_nodeid, r->res_master_nodeid);
635                 dlm_print_rsb(r);
636                 /* fix it and go on */
637                 r->res_master_nodeid = our_nodeid;
638                 r->res_nodeid = 0;
639                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
640                 r->res_first_lkid = 0;
641         }
642
643         if (from_local && (r->res_master_nodeid != our_nodeid)) {
644                 /* Because we have held no locks on this rsb,
645                    res_master_nodeid could have become stale. */
646                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
647                 r->res_first_lkid = 0;
648         }
649
650         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
651         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
652         goto out_unlock;
653
654
655  do_new:
656         /*
657          * rsb not found
658          */
659
660         if (error == -EBADR && !create)
661                 goto out_unlock;
662
663         error = get_rsb_struct(ls, name, len, &r);
664         if (error == -EAGAIN) {
665                 spin_unlock(&ls->ls_rsbtbl[b].lock);
666                 goto retry;
667         }
668         if (error)
669                 goto out_unlock;
670
671         r->res_hash = hash;
672         r->res_bucket = b;
673         r->res_dir_nodeid = dir_nodeid;
674         kref_init(&r->res_ref);
675
676         if (from_dir) {
677                 /* want to see how often this happens */
678                 log_debug(ls, "find_rsb new from_dir %d recreate %s",
679                           from_nodeid, r->res_name);
680                 r->res_master_nodeid = our_nodeid;
681                 r->res_nodeid = 0;
682                 goto out_add;
683         }
684
685         if (from_other && (dir_nodeid != our_nodeid)) {
686                 /* should never happen */
687                 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
688                           from_nodeid, dir_nodeid, our_nodeid, r->res_name);
689                 dlm_free_rsb(r);
690                 r = NULL;
691                 error = -ENOTBLK;
692                 goto out_unlock;
693         }
694
695         if (from_other) {
696                 log_debug(ls, "find_rsb new from_other %d dir %d %s",
697                           from_nodeid, dir_nodeid, r->res_name);
698         }
699
700         if (dir_nodeid == our_nodeid) {
701                 /* When we are the dir nodeid, we can set the master
702                    node immediately */
703                 r->res_master_nodeid = our_nodeid;
704                 r->res_nodeid = 0;
705         } else {
706                 /* set_master will send_lookup to dir_nodeid */
707                 r->res_master_nodeid = 0;
708                 r->res_nodeid = -1;
709         }
710
711  out_add:
712         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
713  out_unlock:
714         spin_unlock(&ls->ls_rsbtbl[b].lock);
715  out:
716         *r_ret = r;
717         return error;
718 }
719
720 /* During recovery, other nodes can send us new MSTCPY locks (from
721    dlm_recover_locks) before we've made ourself master (in
722    dlm_recover_masters). */
723
724 static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
725                           uint32_t hash, uint32_t b,
726                           int dir_nodeid, int from_nodeid,
727                           unsigned int flags, struct dlm_rsb **r_ret)
728 {
729         struct dlm_rsb *r = NULL;
730         int our_nodeid = dlm_our_nodeid();
731         int recover = (flags & R_RECEIVE_RECOVER);
732         int error;
733
734  retry:
735         error = pre_rsb_struct(ls);
736         if (error < 0)
737                 goto out;
738
739         spin_lock(&ls->ls_rsbtbl[b].lock);
740
741         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
742         if (error)
743                 goto do_toss;
744
745         /*
746          * rsb is active, so we can't check master_nodeid without lock_rsb.
747          */
748
749         kref_get(&r->res_ref);
750         goto out_unlock;
751
752
753  do_toss:
754         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
755         if (error)
756                 goto do_new;
757
758         /*
759          * rsb found inactive. No other thread is using this rsb because
760          * it's on the toss list, so we can look at or update
761          * res_master_nodeid without lock_rsb.
762          */
763
764         if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
765                 /* our rsb is not master, and another node has sent us a
766                    request; this should never happen */
767                 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
768                           from_nodeid, r->res_master_nodeid, dir_nodeid);
769                 dlm_print_rsb(r);
770                 error = -ENOTBLK;
771                 goto out_unlock;
772         }
773
774         if (!recover && (r->res_master_nodeid != our_nodeid) &&
775             (dir_nodeid == our_nodeid)) {
776                 /* our rsb is not master, and we are dir; may as well fix it;
777                    this should never happen */
778                 log_error(ls, "find_rsb toss our %d master %d dir %d",
779                           our_nodeid, r->res_master_nodeid, dir_nodeid);
780                 dlm_print_rsb(r);
781                 r->res_master_nodeid = our_nodeid;
782                 r->res_nodeid = 0;
783         }
784
785         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
786         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
787         goto out_unlock;
788
789
790  do_new:
791         /*
792          * rsb not found
793          */
794
795         error = get_rsb_struct(ls, name, len, &r);
796         if (error == -EAGAIN) {
797                 spin_unlock(&ls->ls_rsbtbl[b].lock);
798                 goto retry;
799         }
800         if (error)
801                 goto out_unlock;
802
803         r->res_hash = hash;
804         r->res_bucket = b;
805         r->res_dir_nodeid = dir_nodeid;
806         r->res_master_nodeid = dir_nodeid;
807         r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
808         kref_init(&r->res_ref);
809
810         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
811  out_unlock:
812         spin_unlock(&ls->ls_rsbtbl[b].lock);
813  out:
814         *r_ret = r;
815         return error;
816 }
817
818 static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
819                     unsigned int flags, struct dlm_rsb **r_ret)
820 {
821         uint32_t hash, b;
822         int dir_nodeid;
823
824         if (len > DLM_RESNAME_MAXLEN)
825                 return -EINVAL;
826
827         hash = jhash(name, len, 0);
828         b = hash & (ls->ls_rsbtbl_size - 1);
829
830         dir_nodeid = dlm_hash2nodeid(ls, hash);
831
832         if (dlm_no_directory(ls))
833                 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
834                                       from_nodeid, flags, r_ret);
835         else
836                 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
837                                       from_nodeid, flags, r_ret);
838 }
839
840 /* we have received a request and found that res_master_nodeid != our_nodeid,
841    so we need to return an error or make ourself the master */
842
843 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
844                                   int from_nodeid)
845 {
846         if (dlm_no_directory(ls)) {
847                 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
848                           from_nodeid, r->res_master_nodeid,
849                           r->res_dir_nodeid);
850                 dlm_print_rsb(r);
851                 return -ENOTBLK;
852         }
853
854         if (from_nodeid != r->res_dir_nodeid) {
855                 /* our rsb is not master, and another node (not the dir node)
856                    has sent us a request.  this is much more common when our
857                    master_nodeid is zero, so limit debug to non-zero.  */
858
859                 if (r->res_master_nodeid) {
860                         log_debug(ls, "validate master from_other %d master %d "
861                                   "dir %d first %x %s", from_nodeid,
862                                   r->res_master_nodeid, r->res_dir_nodeid,
863                                   r->res_first_lkid, r->res_name);
864                 }
865                 return -ENOTBLK;
866         } else {
867                 /* our rsb is not master, but the dir nodeid has sent us a
868                    request; this could happen with master 0 / res_nodeid -1 */
869
870                 if (r->res_master_nodeid) {
871                         log_error(ls, "validate master from_dir %d master %d "
872                                   "first %x %s",
873                                   from_nodeid, r->res_master_nodeid,
874                                   r->res_first_lkid, r->res_name);
875                 }
876
877                 r->res_master_nodeid = dlm_our_nodeid();
878                 r->res_nodeid = 0;
879                 return 0;
880         }
881 }
882
883 /*
884  * We're the dir node for this res and another node wants to know the
885  * master nodeid.  During normal operation (non recovery) this is only
886  * called from receive_lookup(); master lookups when the local node is
887  * the dir node are done by find_rsb().
888  *
889  * normal operation, we are the dir node for a resource
890  * . _request_lock
891  * . set_master
892  * . send_lookup
893  * . receive_lookup
894  * . dlm_master_lookup flags 0
895  *
896  * recover directory, we are rebuilding dir for all resources
897  * . dlm_recover_directory
898  * . dlm_rcom_names
899  *   remote node sends back the rsb names it is master of and we are dir of
900  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
901  *   we either create new rsb setting remote node as master, or find existing
902  *   rsb and set master to be the remote node.
903  *
904  * recover masters, we are finding the new master for resources
905  * . dlm_recover_masters
906  * . recover_master
907  * . dlm_send_rcom_lookup
908  * . receive_rcom_lookup
909  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
910  */
911
912 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
913                       unsigned int flags, int *r_nodeid, int *result)
914 {
915         struct dlm_rsb *r = NULL;
916         uint32_t hash, b;
917         int from_master = (flags & DLM_LU_RECOVER_DIR);
918         int fix_master = (flags & DLM_LU_RECOVER_MASTER);
919         int our_nodeid = dlm_our_nodeid();
920         int dir_nodeid, error, toss_list = 0;
921
922         if (len > DLM_RESNAME_MAXLEN)
923                 return -EINVAL;
924
925         if (from_nodeid == our_nodeid) {
926                 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
927                           our_nodeid, flags);
928                 return -EINVAL;
929         }
930
931         hash = jhash(name, len, 0);
932         b = hash & (ls->ls_rsbtbl_size - 1);
933
934         dir_nodeid = dlm_hash2nodeid(ls, hash);
935         if (dir_nodeid != our_nodeid) {
936                 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
937                           from_nodeid, dir_nodeid, our_nodeid, hash,
938                           ls->ls_num_nodes);
939                 *r_nodeid = -1;
940                 return -EINVAL;
941         }
942
943  retry:
944         error = pre_rsb_struct(ls);
945         if (error < 0)
946                 return error;
947
948         spin_lock(&ls->ls_rsbtbl[b].lock);
949         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
950         if (!error) {
951                 /* because the rsb is active, we need to lock_rsb before
952                    checking/changing re_master_nodeid */
953
954                 hold_rsb(r);
955                 spin_unlock(&ls->ls_rsbtbl[b].lock);
956                 lock_rsb(r);
957                 goto found;
958         }
959
960         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
961         if (error)
962                 goto not_found;
963
964         /* because the rsb is inactive (on toss list), it's not refcounted
965            and lock_rsb is not used, but is protected by the rsbtbl lock */
966
967         toss_list = 1;
968  found:
969         if (r->res_dir_nodeid != our_nodeid) {
970                 /* should not happen, but may as well fix it and carry on */
971                 log_error(ls, "dlm_master_lookup res_dir %d our %d %s",
972                           r->res_dir_nodeid, our_nodeid, r->res_name);
973                 r->res_dir_nodeid = our_nodeid;
974         }
975
976         if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
977                 /* Recovery uses this function to set a new master when
978                    the previous master failed.  Setting NEW_MASTER will
979                    force dlm_recover_masters to call recover_master on this
980                    rsb even though the res_nodeid is no longer removed. */
981
982                 r->res_master_nodeid = from_nodeid;
983                 r->res_nodeid = from_nodeid;
984                 rsb_set_flag(r, RSB_NEW_MASTER);
985
986                 if (toss_list) {
987                         /* I don't think we should ever find it on toss list. */
988                         log_error(ls, "dlm_master_lookup fix_master on toss");
989                         dlm_dump_rsb(r);
990                 }
991         }
992
993         if (from_master && (r->res_master_nodeid != from_nodeid)) {
994                 /* this will happen if from_nodeid became master during
995                    a previous recovery cycle, and we aborted the previous
996                    cycle before recovering this master value */
997
998                 log_limit(ls, "dlm_master_lookup from_master %d "
999                           "master_nodeid %d res_nodeid %d first %x %s",
1000                           from_nodeid, r->res_master_nodeid, r->res_nodeid,
1001                           r->res_first_lkid, r->res_name);
1002
1003                 if (r->res_master_nodeid == our_nodeid) {
1004                         log_error(ls, "from_master %d our_master", from_nodeid);
1005                         dlm_dump_rsb(r);
1006                         dlm_send_rcom_lookup_dump(r, from_nodeid);
1007                         goto out_found;
1008                 }
1009
1010                 r->res_master_nodeid = from_nodeid;
1011                 r->res_nodeid = from_nodeid;
1012                 rsb_set_flag(r, RSB_NEW_MASTER);
1013         }
1014
1015         if (!r->res_master_nodeid) {
1016                 /* this will happen if recovery happens while we're looking
1017                    up the master for this rsb */
1018
1019                 log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s",
1020                           from_nodeid, r->res_first_lkid, r->res_name);
1021                 r->res_master_nodeid = from_nodeid;
1022                 r->res_nodeid = from_nodeid;
1023         }
1024
1025         if (!from_master && !fix_master &&
1026             (r->res_master_nodeid == from_nodeid)) {
1027                 /* this can happen when the master sends remove, the dir node
1028                    finds the rsb on the keep list and ignores the remove,
1029                    and the former master sends a lookup */
1030
1031                 log_limit(ls, "dlm_master_lookup from master %d flags %x "
1032                           "first %x %s", from_nodeid, flags,
1033                           r->res_first_lkid, r->res_name);
1034         }
1035
1036  out_found:
1037         *r_nodeid = r->res_master_nodeid;
1038         if (result)
1039                 *result = DLM_LU_MATCH;
1040
1041         if (toss_list) {
1042                 r->res_toss_time = jiffies;
1043                 /* the rsb was inactive (on toss list) */
1044                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1045         } else {
1046                 /* the rsb was active */
1047                 unlock_rsb(r);
1048                 put_rsb(r);
1049         }
1050         return 0;
1051
1052  not_found:
1053         error = get_rsb_struct(ls, name, len, &r);
1054         if (error == -EAGAIN) {
1055                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1056                 goto retry;
1057         }
1058         if (error)
1059                 goto out_unlock;
1060
1061         r->res_hash = hash;
1062         r->res_bucket = b;
1063         r->res_dir_nodeid = our_nodeid;
1064         r->res_master_nodeid = from_nodeid;
1065         r->res_nodeid = from_nodeid;
1066         kref_init(&r->res_ref);
1067         r->res_toss_time = jiffies;
1068
1069         error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1070         if (error) {
1071                 /* should never happen */
1072                 dlm_free_rsb(r);
1073                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1074                 goto retry;
1075         }
1076
1077         if (result)
1078                 *result = DLM_LU_ADD;
1079         *r_nodeid = from_nodeid;
1080         error = 0;
1081  out_unlock:
1082         spin_unlock(&ls->ls_rsbtbl[b].lock);
1083         return error;
1084 }
1085
1086 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1087 {
1088         struct rb_node *n;
1089         struct dlm_rsb *r;
1090         int i;
1091
1092         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1093                 spin_lock(&ls->ls_rsbtbl[i].lock);
1094                 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1095                         r = rb_entry(n, struct dlm_rsb, res_hashnode);
1096                         if (r->res_hash == hash)
1097                                 dlm_dump_rsb(r);
1098                 }
1099                 spin_unlock(&ls->ls_rsbtbl[i].lock);
1100         }
1101 }
1102
1103 void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
1104 {
1105         struct dlm_rsb *r = NULL;
1106         uint32_t hash, b;
1107         int error;
1108
1109         hash = jhash(name, len, 0);
1110         b = hash & (ls->ls_rsbtbl_size - 1);
1111
1112         spin_lock(&ls->ls_rsbtbl[b].lock);
1113         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1114         if (!error)
1115                 goto out_dump;
1116
1117         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1118         if (error)
1119                 goto out;
1120  out_dump:
1121         dlm_dump_rsb(r);
1122  out:
1123         spin_unlock(&ls->ls_rsbtbl[b].lock);
1124 }
1125
1126 static void toss_rsb(struct kref *kref)
1127 {
1128         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1129         struct dlm_ls *ls = r->res_ls;
1130
1131         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1132         kref_init(&r->res_ref);
1133         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1134         rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1135         r->res_toss_time = jiffies;
1136         ls->ls_rsbtbl[r->res_bucket].flags |= DLM_RTF_SHRINK;
1137         if (r->res_lvbptr) {
1138                 dlm_free_lvb(r->res_lvbptr);
1139                 r->res_lvbptr = NULL;
1140         }
1141 }
1142
1143 /* See comment for unhold_lkb */
1144
1145 static void unhold_rsb(struct dlm_rsb *r)
1146 {
1147         int rv;
1148         rv = kref_put(&r->res_ref, toss_rsb);
1149         DLM_ASSERT(!rv, dlm_dump_rsb(r););
1150 }
1151
1152 static void kill_rsb(struct kref *kref)
1153 {
1154         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1155
1156         /* All work is done after the return from kref_put() so we
1157            can release the write_lock before the remove and free. */
1158
1159         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1160         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1161         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1162         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1163         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1164         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1165 }
1166
1167 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1168    The rsb must exist as long as any lkb's for it do. */
1169
1170 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1171 {
1172         hold_rsb(r);
1173         lkb->lkb_resource = r;
1174 }
1175
1176 static void detach_lkb(struct dlm_lkb *lkb)
1177 {
1178         if (lkb->lkb_resource) {
1179                 put_rsb(lkb->lkb_resource);
1180                 lkb->lkb_resource = NULL;
1181         }
1182 }
1183
1184 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1185 {
1186         struct dlm_lkb *lkb;
1187         int rv;
1188
1189         lkb = dlm_allocate_lkb(ls);
1190         if (!lkb)
1191                 return -ENOMEM;
1192
1193         lkb->lkb_nodeid = -1;
1194         lkb->lkb_grmode = DLM_LOCK_IV;
1195         kref_init(&lkb->lkb_ref);
1196         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1197         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1198         INIT_LIST_HEAD(&lkb->lkb_time_list);
1199         INIT_LIST_HEAD(&lkb->lkb_cb_list);
1200         mutex_init(&lkb->lkb_cb_mutex);
1201         INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1202
1203         idr_preload(GFP_NOFS);
1204         spin_lock(&ls->ls_lkbidr_spin);
1205         rv = idr_alloc(&ls->ls_lkbidr, lkb, 1, 0, GFP_NOWAIT);
1206         if (rv >= 0)
1207                 lkb->lkb_id = rv;
1208         spin_unlock(&ls->ls_lkbidr_spin);
1209         idr_preload_end();
1210
1211         if (rv < 0) {
1212                 log_error(ls, "create_lkb idr error %d", rv);
1213                 dlm_free_lkb(lkb);
1214                 return rv;
1215         }
1216
1217         *lkb_ret = lkb;
1218         return 0;
1219 }
1220
1221 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1222 {
1223         struct dlm_lkb *lkb;
1224
1225         spin_lock(&ls->ls_lkbidr_spin);
1226         lkb = idr_find(&ls->ls_lkbidr, lkid);
1227         if (lkb)
1228                 kref_get(&lkb->lkb_ref);
1229         spin_unlock(&ls->ls_lkbidr_spin);
1230
1231         *lkb_ret = lkb;
1232         return lkb ? 0 : -ENOENT;
1233 }
1234
1235 static void kill_lkb(struct kref *kref)
1236 {
1237         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1238
1239         /* All work is done after the return from kref_put() so we
1240            can release the write_lock before the detach_lkb */
1241
1242         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1243 }
1244
1245 /* __put_lkb() is used when an lkb may not have an rsb attached to
1246    it so we need to provide the lockspace explicitly */
1247
1248 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1249 {
1250         uint32_t lkid = lkb->lkb_id;
1251
1252         spin_lock(&ls->ls_lkbidr_spin);
1253         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
1254                 idr_remove(&ls->ls_lkbidr, lkid);
1255                 spin_unlock(&ls->ls_lkbidr_spin);
1256
1257                 detach_lkb(lkb);
1258
1259                 /* for local/process lkbs, lvbptr points to caller's lksb */
1260                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1261                         dlm_free_lvb(lkb->lkb_lvbptr);
1262                 dlm_free_lkb(lkb);
1263                 return 1;
1264         } else {
1265                 spin_unlock(&ls->ls_lkbidr_spin);
1266                 return 0;
1267         }
1268 }
1269
1270 int dlm_put_lkb(struct dlm_lkb *lkb)
1271 {
1272         struct dlm_ls *ls;
1273
1274         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1275         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1276
1277         ls = lkb->lkb_resource->res_ls;
1278         return __put_lkb(ls, lkb);
1279 }
1280
1281 /* This is only called to add a reference when the code already holds
1282    a valid reference to the lkb, so there's no need for locking. */
1283
1284 static inline void hold_lkb(struct dlm_lkb *lkb)
1285 {
1286         kref_get(&lkb->lkb_ref);
1287 }
1288
1289 /* This is called when we need to remove a reference and are certain
1290    it's not the last ref.  e.g. del_lkb is always called between a
1291    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1292    put_lkb would work fine, but would involve unnecessary locking */
1293
1294 static inline void unhold_lkb(struct dlm_lkb *lkb)
1295 {
1296         int rv;
1297         rv = kref_put(&lkb->lkb_ref, kill_lkb);
1298         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
1299 }
1300
1301 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1302                             int mode)
1303 {
1304         struct dlm_lkb *lkb = NULL;
1305
1306         list_for_each_entry(lkb, head, lkb_statequeue)
1307                 if (lkb->lkb_rqmode < mode)
1308                         break;
1309
1310         __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
1311 }
1312
1313 /* add/remove lkb to rsb's grant/convert/wait queue */
1314
1315 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1316 {
1317         kref_get(&lkb->lkb_ref);
1318
1319         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1320
1321         lkb->lkb_timestamp = ktime_get();
1322
1323         lkb->lkb_status = status;
1324
1325         switch (status) {
1326         case DLM_LKSTS_WAITING:
1327                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1328                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1329                 else
1330                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1331                 break;
1332         case DLM_LKSTS_GRANTED:
1333                 /* convention says granted locks kept in order of grmode */
1334                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1335                                 lkb->lkb_grmode);
1336                 break;
1337         case DLM_LKSTS_CONVERT:
1338                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1339                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1340                 else
1341                         list_add_tail(&lkb->lkb_statequeue,
1342                                       &r->res_convertqueue);
1343                 break;
1344         default:
1345                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1346         }
1347 }
1348
1349 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1350 {
1351         lkb->lkb_status = 0;
1352         list_del(&lkb->lkb_statequeue);
1353         unhold_lkb(lkb);
1354 }
1355
1356 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1357 {
1358         hold_lkb(lkb);
1359         del_lkb(r, lkb);
1360         add_lkb(r, lkb, sts);
1361         unhold_lkb(lkb);
1362 }
1363
1364 static int msg_reply_type(int mstype)
1365 {
1366         switch (mstype) {
1367         case DLM_MSG_REQUEST:
1368                 return DLM_MSG_REQUEST_REPLY;
1369         case DLM_MSG_CONVERT:
1370                 return DLM_MSG_CONVERT_REPLY;
1371         case DLM_MSG_UNLOCK:
1372                 return DLM_MSG_UNLOCK_REPLY;
1373         case DLM_MSG_CANCEL:
1374                 return DLM_MSG_CANCEL_REPLY;
1375         case DLM_MSG_LOOKUP:
1376                 return DLM_MSG_LOOKUP_REPLY;
1377         }
1378         return -1;
1379 }
1380
1381 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
1382 {
1383         int i;
1384
1385         for (i = 0; i < num_nodes; i++) {
1386                 if (!warned[i]) {
1387                         warned[i] = nodeid;
1388                         return 0;
1389                 }
1390                 if (warned[i] == nodeid)
1391                         return 1;
1392         }
1393         return 0;
1394 }
1395
1396 void dlm_scan_waiters(struct dlm_ls *ls)
1397 {
1398         struct dlm_lkb *lkb;
1399         s64 us;
1400         s64 debug_maxus = 0;
1401         u32 debug_scanned = 0;
1402         u32 debug_expired = 0;
1403         int num_nodes = 0;
1404         int *warned = NULL;
1405
1406         if (!dlm_config.ci_waitwarn_us)
1407                 return;
1408
1409         mutex_lock(&ls->ls_waiters_mutex);
1410
1411         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1412                 if (!lkb->lkb_wait_time)
1413                         continue;
1414
1415                 debug_scanned++;
1416
1417                 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
1418
1419                 if (us < dlm_config.ci_waitwarn_us)
1420                         continue;
1421
1422                 lkb->lkb_wait_time = 0;
1423
1424                 debug_expired++;
1425                 if (us > debug_maxus)
1426                         debug_maxus = us;
1427
1428                 if (!num_nodes) {
1429                         num_nodes = ls->ls_num_nodes;
1430                         warned = kcalloc(num_nodes, sizeof(int), GFP_KERNEL);
1431                 }
1432                 if (!warned)
1433                         continue;
1434                 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
1435                         continue;
1436
1437                 log_error(ls, "waitwarn %x %lld %d us check connection to "
1438                           "node %d", lkb->lkb_id, (long long)us,
1439                           dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
1440         }
1441         mutex_unlock(&ls->ls_waiters_mutex);
1442         kfree(warned);
1443
1444         if (debug_expired)
1445                 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
1446                           debug_scanned, debug_expired,
1447                           dlm_config.ci_waitwarn_us, (long long)debug_maxus);
1448 }
1449
1450 /* add/remove lkb from global waiters list of lkb's waiting for
1451    a reply from a remote node */
1452
1453 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1454 {
1455         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1456         int error = 0;
1457
1458         mutex_lock(&ls->ls_waiters_mutex);
1459
1460         if (is_overlap_unlock(lkb) ||
1461             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1462                 error = -EINVAL;
1463                 goto out;
1464         }
1465
1466         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1467                 switch (mstype) {
1468                 case DLM_MSG_UNLOCK:
1469                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1470                         break;
1471                 case DLM_MSG_CANCEL:
1472                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1473                         break;
1474                 default:
1475                         error = -EBUSY;
1476                         goto out;
1477                 }
1478                 lkb->lkb_wait_count++;
1479                 hold_lkb(lkb);
1480
1481                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1482                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
1483                           lkb->lkb_wait_count, lkb->lkb_flags);
1484                 goto out;
1485         }
1486
1487         DLM_ASSERT(!lkb->lkb_wait_count,
1488                    dlm_print_lkb(lkb);
1489                    printk("wait_count %d\n", lkb->lkb_wait_count););
1490
1491         lkb->lkb_wait_count++;
1492         lkb->lkb_wait_type = mstype;
1493         lkb->lkb_wait_time = ktime_get();
1494         lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1495         hold_lkb(lkb);
1496         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1497  out:
1498         if (error)
1499                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
1500                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
1501                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1502         mutex_unlock(&ls->ls_waiters_mutex);
1503         return error;
1504 }
1505
1506 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1507    list as part of process_requestqueue (e.g. a lookup that has an optimized
1508    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1509    set RESEND and dlm_recover_waiters_post() */
1510
1511 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1512                                 struct dlm_message *ms)
1513 {
1514         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1515         int overlap_done = 0;
1516
1517         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1518                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1519                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1520                 overlap_done = 1;
1521                 goto out_del;
1522         }
1523
1524         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1525                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1526                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1527                 overlap_done = 1;
1528                 goto out_del;
1529         }
1530
1531         /* Cancel state was preemptively cleared by a successful convert,
1532            see next comment, nothing to do. */
1533
1534         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1535             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1536                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1537                           lkb->lkb_id, lkb->lkb_wait_type);
1538                 return -1;
1539         }
1540
1541         /* Remove for the convert reply, and premptively remove for the
1542            cancel reply.  A convert has been granted while there's still
1543            an outstanding cancel on it (the cancel is moot and the result
1544            in the cancel reply should be 0).  We preempt the cancel reply
1545            because the app gets the convert result and then can follow up
1546            with another op, like convert.  This subsequent op would see the
1547            lingering state of the cancel and fail with -EBUSY. */
1548
1549         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1550             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1551             is_overlap_cancel(lkb) && ms && !ms->m_result) {
1552                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1553                           lkb->lkb_id);
1554                 lkb->lkb_wait_type = 0;
1555                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1556                 lkb->lkb_wait_count--;
1557                 unhold_lkb(lkb);
1558                 goto out_del;
1559         }
1560
1561         /* N.B. type of reply may not always correspond to type of original
1562            msg due to lookup->request optimization, verify others? */
1563
1564         if (lkb->lkb_wait_type) {
1565                 lkb->lkb_wait_type = 0;
1566                 goto out_del;
1567         }
1568
1569         log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1570                   lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
1571                   mstype, lkb->lkb_flags);
1572         return -1;
1573
1574  out_del:
1575         /* the force-unlock/cancel has completed and we haven't recvd a reply
1576            to the op that was in progress prior to the unlock/cancel; we
1577            give up on any reply to the earlier op.  FIXME: not sure when/how
1578            this would happen */
1579
1580         if (overlap_done && lkb->lkb_wait_type) {
1581                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1582                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
1583                 lkb->lkb_wait_count--;
1584                 unhold_lkb(lkb);
1585                 lkb->lkb_wait_type = 0;
1586         }
1587
1588         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1589
1590         lkb->lkb_flags &= ~DLM_IFL_RESEND;
1591         lkb->lkb_wait_count--;
1592         if (!lkb->lkb_wait_count)
1593                 list_del_init(&lkb->lkb_wait_reply);
1594         unhold_lkb(lkb);
1595         return 0;
1596 }
1597
1598 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1599 {
1600         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1601         int error;
1602
1603         mutex_lock(&ls->ls_waiters_mutex);
1604         error = _remove_from_waiters(lkb, mstype, NULL);
1605         mutex_unlock(&ls->ls_waiters_mutex);
1606         return error;
1607 }
1608
1609 /* Handles situations where we might be processing a "fake" or "stub" reply in
1610    which we can't try to take waiters_mutex again. */
1611
1612 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1613 {
1614         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1615         int error;
1616
1617         if (ms->m_flags != DLM_IFL_STUB_MS)
1618                 mutex_lock(&ls->ls_waiters_mutex);
1619         error = _remove_from_waiters(lkb, ms->m_type, ms);
1620         if (ms->m_flags != DLM_IFL_STUB_MS)
1621                 mutex_unlock(&ls->ls_waiters_mutex);
1622         return error;
1623 }
1624
1625 /* If there's an rsb for the same resource being removed, ensure
1626    that the remove message is sent before the new lookup message.
1627    It should be rare to need a delay here, but if not, then it may
1628    be worthwhile to add a proper wait mechanism rather than a delay. */
1629
1630 static void wait_pending_remove(struct dlm_rsb *r)
1631 {
1632         struct dlm_ls *ls = r->res_ls;
1633  restart:
1634         spin_lock(&ls->ls_remove_spin);
1635         if (ls->ls_remove_len &&
1636             !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) {
1637                 log_debug(ls, "delay lookup for remove dir %d %s",
1638                           r->res_dir_nodeid, r->res_name);
1639                 spin_unlock(&ls->ls_remove_spin);
1640                 msleep(1);
1641                 goto restart;
1642         }
1643         spin_unlock(&ls->ls_remove_spin);
1644 }
1645
1646 /*
1647  * ls_remove_spin protects ls_remove_name and ls_remove_len which are
1648  * read by other threads in wait_pending_remove.  ls_remove_names
1649  * and ls_remove_lens are only used by the scan thread, so they do
1650  * not need protection.
1651  */
1652
1653 static void shrink_bucket(struct dlm_ls *ls, int b)
1654 {
1655         struct rb_node *n, *next;
1656         struct dlm_rsb *r;
1657         char *name;
1658         int our_nodeid = dlm_our_nodeid();
1659         int remote_count = 0;
1660         int need_shrink = 0;
1661         int i, len, rv;
1662
1663         memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1664
1665         spin_lock(&ls->ls_rsbtbl[b].lock);
1666
1667         if (!(ls->ls_rsbtbl[b].flags & DLM_RTF_SHRINK)) {
1668                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1669                 return;
1670         }
1671
1672         for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1673                 next = rb_next(n);
1674                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1675
1676                 /* If we're the directory record for this rsb, and
1677                    we're not the master of it, then we need to wait
1678                    for the master node to send us a dir remove for
1679                    before removing the dir record. */
1680
1681                 if (!dlm_no_directory(ls) &&
1682                     (r->res_master_nodeid != our_nodeid) &&
1683                     (dlm_dir_nodeid(r) == our_nodeid)) {
1684                         continue;
1685                 }
1686
1687                 need_shrink = 1;
1688
1689                 if (!time_after_eq(jiffies, r->res_toss_time +
1690                                    dlm_config.ci_toss_secs * HZ)) {
1691                         continue;
1692                 }
1693
1694                 if (!dlm_no_directory(ls) &&
1695                     (r->res_master_nodeid == our_nodeid) &&
1696                     (dlm_dir_nodeid(r) != our_nodeid)) {
1697
1698                         /* We're the master of this rsb but we're not
1699                            the directory record, so we need to tell the
1700                            dir node to remove the dir record. */
1701
1702                         ls->ls_remove_lens[remote_count] = r->res_length;
1703                         memcpy(ls->ls_remove_names[remote_count], r->res_name,
1704                                DLM_RESNAME_MAXLEN);
1705                         remote_count++;
1706
1707                         if (remote_count >= DLM_REMOVE_NAMES_MAX)
1708                                 break;
1709                         continue;
1710                 }
1711
1712                 if (!kref_put(&r->res_ref, kill_rsb)) {
1713                         log_error(ls, "tossed rsb in use %s", r->res_name);
1714                         continue;
1715                 }
1716
1717                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1718                 dlm_free_rsb(r);
1719         }
1720
1721         if (need_shrink)
1722                 ls->ls_rsbtbl[b].flags |= DLM_RTF_SHRINK;
1723         else
1724                 ls->ls_rsbtbl[b].flags &= ~DLM_RTF_SHRINK;
1725         spin_unlock(&ls->ls_rsbtbl[b].lock);
1726
1727         /*
1728          * While searching for rsb's to free, we found some that require
1729          * remote removal.  We leave them in place and find them again here
1730          * so there is a very small gap between removing them from the toss
1731          * list and sending the removal.  Keeping this gap small is
1732          * important to keep us (the master node) from being out of sync
1733          * with the remote dir node for very long.
1734          *
1735          * From the time the rsb is removed from toss until just after
1736          * send_remove, the rsb name is saved in ls_remove_name.  A new
1737          * lookup checks this to ensure that a new lookup message for the
1738          * same resource name is not sent just before the remove message.
1739          */
1740
1741         for (i = 0; i < remote_count; i++) {
1742                 name = ls->ls_remove_names[i];
1743                 len = ls->ls_remove_lens[i];
1744
1745                 spin_lock(&ls->ls_rsbtbl[b].lock);
1746                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1747                 if (rv) {
1748                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1749                         log_debug(ls, "remove_name not toss %s", name);
1750                         continue;
1751                 }
1752
1753                 if (r->res_master_nodeid != our_nodeid) {
1754                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1755                         log_debug(ls, "remove_name master %d dir %d our %d %s",
1756                                   r->res_master_nodeid, r->res_dir_nodeid,
1757                                   our_nodeid, name);
1758                         continue;
1759                 }
1760
1761                 if (r->res_dir_nodeid == our_nodeid) {
1762                         /* should never happen */
1763                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1764                         log_error(ls, "remove_name dir %d master %d our %d %s",
1765                                   r->res_dir_nodeid, r->res_master_nodeid,
1766                                   our_nodeid, name);
1767                         continue;
1768                 }
1769
1770                 if (!time_after_eq(jiffies, r->res_toss_time +
1771                                    dlm_config.ci_toss_secs * HZ)) {
1772                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1773                         log_debug(ls, "remove_name toss_time %lu now %lu %s",
1774                                   r->res_toss_time, jiffies, name);
1775                         continue;
1776                 }
1777
1778                 if (!kref_put(&r->res_ref, kill_rsb)) {
1779                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1780                         log_error(ls, "remove_name in use %s", name);
1781                         continue;
1782                 }
1783
1784                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1785
1786                 /* block lookup of same name until we've sent remove */
1787                 spin_lock(&ls->ls_remove_spin);
1788                 ls->ls_remove_len = len;
1789                 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
1790                 spin_unlock(&ls->ls_remove_spin);
1791                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1792
1793                 send_remove(r);
1794
1795                 /* allow lookup of name again */
1796                 spin_lock(&ls->ls_remove_spin);
1797                 ls->ls_remove_len = 0;
1798                 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
1799                 spin_unlock(&ls->ls_remove_spin);
1800
1801                 dlm_free_rsb(r);
1802         }
1803 }
1804
1805 void dlm_scan_rsbs(struct dlm_ls *ls)
1806 {
1807         int i;
1808
1809         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1810                 shrink_bucket(ls, i);
1811                 if (dlm_locking_stopped(ls))
1812                         break;
1813                 cond_resched();
1814         }
1815 }
1816
1817 static void add_timeout(struct dlm_lkb *lkb)
1818 {
1819         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1820
1821         if (is_master_copy(lkb))
1822                 return;
1823
1824         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1825             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1826                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1827                 goto add_it;
1828         }
1829         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1830                 goto add_it;
1831         return;
1832
1833  add_it:
1834         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1835         mutex_lock(&ls->ls_timeout_mutex);
1836         hold_lkb(lkb);
1837         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1838         mutex_unlock(&ls->ls_timeout_mutex);
1839 }
1840
1841 static void del_timeout(struct dlm_lkb *lkb)
1842 {
1843         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1844
1845         mutex_lock(&ls->ls_timeout_mutex);
1846         if (!list_empty(&lkb->lkb_time_list)) {
1847                 list_del_init(&lkb->lkb_time_list);
1848                 unhold_lkb(lkb);
1849         }
1850         mutex_unlock(&ls->ls_timeout_mutex);
1851 }
1852
1853 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1854    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1855    and then lock rsb because of lock ordering in add_timeout.  We may need
1856    to specify some special timeout-related bits in the lkb that are just to
1857    be accessed under the timeout_mutex. */
1858
1859 void dlm_scan_timeout(struct dlm_ls *ls)
1860 {
1861         struct dlm_rsb *r;
1862         struct dlm_lkb *lkb;
1863         int do_cancel, do_warn;
1864         s64 wait_us;
1865
1866         for (;;) {
1867                 if (dlm_locking_stopped(ls))
1868                         break;
1869
1870                 do_cancel = 0;
1871                 do_warn = 0;
1872                 mutex_lock(&ls->ls_timeout_mutex);
1873                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1874
1875                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1876                                                         lkb->lkb_timestamp));
1877
1878                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1879                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1880                                 do_cancel = 1;
1881
1882                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1883                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1884                                 do_warn = 1;
1885
1886                         if (!do_cancel && !do_warn)
1887                                 continue;
1888                         hold_lkb(lkb);
1889                         break;
1890                 }
1891                 mutex_unlock(&ls->ls_timeout_mutex);
1892
1893                 if (!do_cancel && !do_warn)
1894                         break;
1895
1896                 r = lkb->lkb_resource;
1897                 hold_rsb(r);
1898                 lock_rsb(r);
1899
1900                 if (do_warn) {
1901                         /* clear flag so we only warn once */
1902                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1903                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1904                                 del_timeout(lkb);
1905                         dlm_timeout_warn(lkb);
1906                 }
1907
1908                 if (do_cancel) {
1909                         log_debug(ls, "timeout cancel %x node %d %s",
1910                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1911                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1912                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1913                         del_timeout(lkb);
1914                         _cancel_lock(r, lkb);
1915                 }
1916
1917                 unlock_rsb(r);
1918                 unhold_rsb(r);
1919                 dlm_put_lkb(lkb);
1920         }
1921 }
1922
1923 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1924    dlm_recoverd before checking/setting ls_recover_begin. */
1925
1926 void dlm_adjust_timeouts(struct dlm_ls *ls)
1927 {
1928         struct dlm_lkb *lkb;
1929         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1930
1931         ls->ls_recover_begin = 0;
1932         mutex_lock(&ls->ls_timeout_mutex);
1933         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1934                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1935         mutex_unlock(&ls->ls_timeout_mutex);
1936
1937         if (!dlm_config.ci_waitwarn_us)
1938                 return;
1939
1940         mutex_lock(&ls->ls_waiters_mutex);
1941         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1942                 if (ktime_to_us(lkb->lkb_wait_time))
1943                         lkb->lkb_wait_time = ktime_get();
1944         }
1945         mutex_unlock(&ls->ls_waiters_mutex);
1946 }
1947
1948 /* lkb is master or local copy */
1949
1950 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1951 {
1952         int b, len = r->res_ls->ls_lvblen;
1953
1954         /* b=1 lvb returned to caller
1955            b=0 lvb written to rsb or invalidated
1956            b=-1 do nothing */
1957
1958         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1959
1960         if (b == 1) {
1961                 if (!lkb->lkb_lvbptr)
1962                         return;
1963
1964                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1965                         return;
1966
1967                 if (!r->res_lvbptr)
1968                         return;
1969
1970                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1971                 lkb->lkb_lvbseq = r->res_lvbseq;
1972
1973         } else if (b == 0) {
1974                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1975                         rsb_set_flag(r, RSB_VALNOTVALID);
1976                         return;
1977                 }
1978
1979                 if (!lkb->lkb_lvbptr)
1980                         return;
1981
1982                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1983                         return;
1984
1985                 if (!r->res_lvbptr)
1986                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1987
1988                 if (!r->res_lvbptr)
1989                         return;
1990
1991                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1992                 r->res_lvbseq++;
1993                 lkb->lkb_lvbseq = r->res_lvbseq;
1994                 rsb_clear_flag(r, RSB_VALNOTVALID);
1995         }
1996
1997         if (rsb_flag(r, RSB_VALNOTVALID))
1998                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1999 }
2000
2001 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2002 {
2003         if (lkb->lkb_grmode < DLM_LOCK_PW)
2004                 return;
2005
2006         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
2007                 rsb_set_flag(r, RSB_VALNOTVALID);
2008                 return;
2009         }
2010
2011         if (!lkb->lkb_lvbptr)
2012                 return;
2013
2014         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2015                 return;
2016
2017         if (!r->res_lvbptr)
2018                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
2019
2020         if (!r->res_lvbptr)
2021                 return;
2022
2023         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2024         r->res_lvbseq++;
2025         rsb_clear_flag(r, RSB_VALNOTVALID);
2026 }
2027
2028 /* lkb is process copy (pc) */
2029
2030 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2031                             struct dlm_message *ms)
2032 {
2033         int b;
2034
2035         if (!lkb->lkb_lvbptr)
2036                 return;
2037
2038         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2039                 return;
2040
2041         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
2042         if (b == 1) {
2043                 int len = receive_extralen(ms);
2044                 if (len > r->res_ls->ls_lvblen)
2045                         len = r->res_ls->ls_lvblen;
2046                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2047                 lkb->lkb_lvbseq = ms->m_lvbseq;
2048         }
2049 }
2050
2051 /* Manipulate lkb's on rsb's convert/granted/waiting queues
2052    remove_lock -- used for unlock, removes lkb from granted
2053    revert_lock -- used for cancel, moves lkb from convert to granted
2054    grant_lock  -- used for request and convert, adds lkb to granted or
2055                   moves lkb from convert or waiting to granted
2056
2057    Each of these is used for master or local copy lkb's.  There is
2058    also a _pc() variation used to make the corresponding change on
2059    a process copy (pc) lkb. */
2060
2061 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2062 {
2063         del_lkb(r, lkb);
2064         lkb->lkb_grmode = DLM_LOCK_IV;
2065         /* this unhold undoes the original ref from create_lkb()
2066            so this leads to the lkb being freed */
2067         unhold_lkb(lkb);
2068 }
2069
2070 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2071 {
2072         set_lvb_unlock(r, lkb);
2073         _remove_lock(r, lkb);
2074 }
2075
2076 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2077 {
2078         _remove_lock(r, lkb);
2079 }
2080
2081 /* returns: 0 did nothing
2082             1 moved lock to granted
2083            -1 removed lock */
2084
2085 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2086 {
2087         int rv = 0;
2088
2089         lkb->lkb_rqmode = DLM_LOCK_IV;
2090
2091         switch (lkb->lkb_status) {
2092         case DLM_LKSTS_GRANTED:
2093                 break;
2094         case DLM_LKSTS_CONVERT:
2095                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2096                 rv = 1;
2097                 break;
2098         case DLM_LKSTS_WAITING:
2099                 del_lkb(r, lkb);
2100                 lkb->lkb_grmode = DLM_LOCK_IV;
2101                 /* this unhold undoes the original ref from create_lkb()
2102                    so this leads to the lkb being freed */
2103                 unhold_lkb(lkb);
2104                 rv = -1;
2105                 break;
2106         default:
2107                 log_print("invalid status for revert %d", lkb->lkb_status);
2108         }
2109         return rv;
2110 }
2111
2112 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2113 {
2114         return revert_lock(r, lkb);
2115 }
2116
2117 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2118 {
2119         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2120                 lkb->lkb_grmode = lkb->lkb_rqmode;
2121                 if (lkb->lkb_status)
2122                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2123                 else
2124                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2125         }
2126
2127         lkb->lkb_rqmode = DLM_LOCK_IV;
2128         lkb->lkb_highbast = 0;
2129 }
2130
2131 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2132 {
2133         set_lvb_lock(r, lkb);
2134         _grant_lock(r, lkb);
2135 }
2136
2137 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2138                           struct dlm_message *ms)
2139 {
2140         set_lvb_lock_pc(r, lkb, ms);
2141         _grant_lock(r, lkb);
2142 }
2143
2144 /* called by grant_pending_locks() which means an async grant message must
2145    be sent to the requesting node in addition to granting the lock if the
2146    lkb belongs to a remote node. */
2147
2148 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2149 {
2150         grant_lock(r, lkb);
2151         if (is_master_copy(lkb))
2152                 send_grant(r, lkb);
2153         else
2154                 queue_cast(r, lkb, 0);
2155 }
2156
2157 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2158    change the granted/requested modes.  We're munging things accordingly in
2159    the process copy.
2160    CONVDEADLK: our grmode may have been forced down to NL to resolve a
2161    conversion deadlock
2162    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2163    compatible with other granted locks */
2164
2165 static void munge_demoted(struct dlm_lkb *lkb)
2166 {
2167         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2168                 log_print("munge_demoted %x invalid modes gr %d rq %d",
2169                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2170                 return;
2171         }
2172
2173         lkb->lkb_grmode = DLM_LOCK_NL;
2174 }
2175
2176 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
2177 {
2178         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
2179             ms->m_type != DLM_MSG_GRANT) {
2180                 log_print("munge_altmode %x invalid reply type %d",
2181                           lkb->lkb_id, ms->m_type);
2182                 return;
2183         }
2184
2185         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2186                 lkb->lkb_rqmode = DLM_LOCK_PR;
2187         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2188                 lkb->lkb_rqmode = DLM_LOCK_CW;
2189         else {
2190                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2191                 dlm_print_lkb(lkb);
2192         }
2193 }
2194
2195 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2196 {
2197         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2198                                            lkb_statequeue);
2199         if (lkb->lkb_id == first->lkb_id)
2200                 return 1;
2201
2202         return 0;
2203 }
2204
2205 /* Check if the given lkb conflicts with another lkb on the queue. */
2206
2207 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2208 {
2209         struct dlm_lkb *this;
2210
2211         list_for_each_entry(this, head, lkb_statequeue) {
2212                 if (this == lkb)
2213                         continue;
2214                 if (!modes_compat(this, lkb))
2215                         return 1;
2216         }
2217         return 0;
2218 }
2219
2220 /*
2221  * "A conversion deadlock arises with a pair of lock requests in the converting
2222  * queue for one resource.  The granted mode of each lock blocks the requested
2223  * mode of the other lock."
2224  *
2225  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2226  * convert queue from being granted, then deadlk/demote lkb.
2227  *
2228  * Example:
2229  * Granted Queue: empty
2230  * Convert Queue: NL->EX (first lock)
2231  *                PR->EX (second lock)
2232  *
2233  * The first lock can't be granted because of the granted mode of the second
2234  * lock and the second lock can't be granted because it's not first in the
2235  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2236  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2237  * flag set and return DEMOTED in the lksb flags.
2238  *
2239  * Originally, this function detected conv-deadlk in a more limited scope:
2240  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2241  * - if lkb1 was the first entry in the queue (not just earlier), and was
2242  *   blocked by the granted mode of lkb2, and there was nothing on the
2243  *   granted queue preventing lkb1 from being granted immediately, i.e.
2244  *   lkb2 was the only thing preventing lkb1 from being granted.
2245  *
2246  * That second condition meant we'd only say there was conv-deadlk if
2247  * resolving it (by demotion) would lead to the first lock on the convert
2248  * queue being granted right away.  It allowed conversion deadlocks to exist
2249  * between locks on the convert queue while they couldn't be granted anyway.
2250  *
2251  * Now, we detect and take action on conversion deadlocks immediately when
2252  * they're created, even if they may not be immediately consequential.  If
2253  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2254  * mode that would prevent lkb1's conversion from being granted, we do a
2255  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2256  * I think this means that the lkb_is_ahead condition below should always
2257  * be zero, i.e. there will never be conv-deadlk between two locks that are
2258  * both already on the convert queue.
2259  */
2260
2261 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2262 {
2263         struct dlm_lkb *lkb1;
2264         int lkb_is_ahead = 0;
2265
2266         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2267                 if (lkb1 == lkb2) {
2268                         lkb_is_ahead = 1;
2269                         continue;
2270                 }
2271
2272                 if (!lkb_is_ahead) {
2273                         if (!modes_compat(lkb2, lkb1))
2274                                 return 1;
2275                 } else {
2276                         if (!modes_compat(lkb2, lkb1) &&
2277                             !modes_compat(lkb1, lkb2))
2278                                 return 1;
2279                 }
2280         }
2281         return 0;
2282 }
2283
2284 /*
2285  * Return 1 if the lock can be granted, 0 otherwise.
2286  * Also detect and resolve conversion deadlocks.
2287  *
2288  * lkb is the lock to be granted
2289  *
2290  * now is 1 if the function is being called in the context of the
2291  * immediate request, it is 0 if called later, after the lock has been
2292  * queued.
2293  *
2294  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2295  * after recovery.
2296  *
2297  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2298  */
2299
2300 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2301                            int recover)
2302 {
2303         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2304
2305         /*
2306          * 6-10: Version 5.4 introduced an option to address the phenomenon of
2307          * a new request for a NL mode lock being blocked.
2308          *
2309          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2310          * request, then it would be granted.  In essence, the use of this flag
2311          * tells the Lock Manager to expedite theis request by not considering
2312          * what may be in the CONVERTING or WAITING queues...  As of this
2313          * writing, the EXPEDITE flag can be used only with new requests for NL
2314          * mode locks.  This flag is not valid for conversion requests.
2315          *
2316          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2317          * conversion or used with a non-NL requested mode.  We also know an
2318          * EXPEDITE request is always granted immediately, so now must always
2319          * be 1.  The full condition to grant an expedite request: (now &&
2320          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2321          * therefore be shortened to just checking the flag.
2322          */
2323
2324         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2325                 return 1;
2326
2327         /*
2328          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2329          * added to the remaining conditions.
2330          */
2331
2332         if (queue_conflict(&r->res_grantqueue, lkb))
2333                 return 0;
2334
2335         /*
2336          * 6-3: By default, a conversion request is immediately granted if the
2337          * requested mode is compatible with the modes of all other granted
2338          * locks
2339          */
2340
2341         if (queue_conflict(&r->res_convertqueue, lkb))
2342                 return 0;
2343
2344         /*
2345          * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2346          * locks for a recovered rsb, on which lkb's have been rebuilt.
2347          * The lkb's may have been rebuilt on the queues in a different
2348          * order than they were in on the previous master.  So, granting
2349          * queued conversions in order after recovery doesn't make sense
2350          * since the order hasn't been preserved anyway.  The new order
2351          * could also have created a new "in place" conversion deadlock.
2352          * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2353          * After recovery, there would be no granted locks, and possibly
2354          * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2355          * recovery, grant conversions without considering order.
2356          */
2357
2358         if (conv && recover)
2359                 return 1;
2360
2361         /*
2362          * 6-5: But the default algorithm for deciding whether to grant or
2363          * queue conversion requests does not by itself guarantee that such
2364          * requests are serviced on a "first come first serve" basis.  This, in
2365          * turn, can lead to a phenomenon known as "indefinate postponement".
2366          *
2367          * 6-7: This issue is dealt with by using the optional QUECVT flag with
2368          * the system service employed to request a lock conversion.  This flag
2369          * forces certain conversion requests to be queued, even if they are
2370          * compatible with the granted modes of other locks on the same
2371          * resource.  Thus, the use of this flag results in conversion requests
2372          * being ordered on a "first come first servce" basis.
2373          *
2374          * DCT: This condition is all about new conversions being able to occur
2375          * "in place" while the lock remains on the granted queue (assuming
2376          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2377          * doesn't _have_ to go onto the convert queue where it's processed in
2378          * order.  The "now" variable is necessary to distinguish converts
2379          * being received and processed for the first time now, because once a
2380          * convert is moved to the conversion queue the condition below applies
2381          * requiring fifo granting.
2382          */
2383
2384         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2385                 return 1;
2386
2387         /*
2388          * Even if the convert is compat with all granted locks,
2389          * QUECVT forces it behind other locks on the convert queue.
2390          */
2391
2392         if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2393                 if (list_empty(&r->res_convertqueue))
2394                         return 1;
2395                 else
2396                         return 0;
2397         }
2398
2399         /*
2400          * The NOORDER flag is set to avoid the standard vms rules on grant
2401          * order.
2402          */
2403
2404         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2405                 return 1;
2406
2407         /*
2408          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2409          * granted until all other conversion requests ahead of it are granted
2410          * and/or canceled.
2411          */
2412
2413         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2414                 return 1;
2415
2416         /*
2417          * 6-4: By default, a new request is immediately granted only if all
2418          * three of the following conditions are satisfied when the request is
2419          * issued:
2420          * - The queue of ungranted conversion requests for the resource is
2421          *   empty.
2422          * - The queue of ungranted new requests for the resource is empty.
2423          * - The mode of the new request is compatible with the most
2424          *   restrictive mode of all granted locks on the resource.
2425          */
2426
2427         if (now && !conv && list_empty(&r->res_convertqueue) &&
2428             list_empty(&r->res_waitqueue))
2429                 return 1;
2430
2431         /*
2432          * 6-4: Once a lock request is in the queue of ungranted new requests,
2433          * it cannot be granted until the queue of ungranted conversion
2434          * requests is empty, all ungranted new requests ahead of it are
2435          * granted and/or canceled, and it is compatible with the granted mode
2436          * of the most restrictive lock granted on the resource.
2437          */
2438
2439         if (!now && !conv && list_empty(&r->res_convertqueue) &&
2440             first_in_list(lkb, &r->res_waitqueue))
2441                 return 1;
2442
2443         return 0;
2444 }
2445
2446 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2447                           int recover, int *err)
2448 {
2449         int rv;
2450         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2451         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2452
2453         if (err)
2454                 *err = 0;
2455
2456         rv = _can_be_granted(r, lkb, now, recover);
2457         if (rv)
2458                 goto out;
2459
2460         /*
2461          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2462          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2463          * cancels one of the locks.
2464          */
2465
2466         if (is_convert && can_be_queued(lkb) &&
2467             conversion_deadlock_detect(r, lkb)) {
2468                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2469                         lkb->lkb_grmode = DLM_LOCK_NL;
2470                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
2471                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
2472                         if (err)
2473                                 *err = -EDEADLK;
2474                         else {
2475                                 log_print("can_be_granted deadlock %x now %d",
2476                                           lkb->lkb_id, now);
2477                                 dlm_dump_rsb(r);
2478                         }
2479                 }
2480                 goto out;
2481         }
2482
2483         /*
2484          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2485          * to grant a request in a mode other than the normal rqmode.  It's a
2486          * simple way to provide a big optimization to applications that can
2487          * use them.
2488          */
2489
2490         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2491                 alt = DLM_LOCK_PR;
2492         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2493                 alt = DLM_LOCK_CW;
2494
2495         if (alt) {
2496                 lkb->lkb_rqmode = alt;
2497                 rv = _can_be_granted(r, lkb, now, 0);
2498                 if (rv)
2499                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
2500                 else
2501                         lkb->lkb_rqmode = rqmode;
2502         }
2503  out:
2504         return rv;
2505 }
2506
2507 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
2508    for locks pending on the convert list.  Once verified (watch for these
2509    log_prints), we should be able to just call _can_be_granted() and not
2510    bother with the demote/deadlk cases here (and there's no easy way to deal
2511    with a deadlk here, we'd have to generate something like grant_lock with
2512    the deadlk error.) */
2513
2514 /* Returns the highest requested mode of all blocked conversions; sets
2515    cw if there's a blocked conversion to DLM_LOCK_CW. */
2516
2517 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2518                                  unsigned int *count)
2519 {
2520         struct dlm_lkb *lkb, *s;
2521         int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2522         int hi, demoted, quit, grant_restart, demote_restart;
2523         int deadlk;
2524
2525         quit = 0;
2526  restart:
2527         grant_restart = 0;
2528         demote_restart = 0;
2529         hi = DLM_LOCK_IV;
2530
2531         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2532                 demoted = is_demoted(lkb);
2533                 deadlk = 0;
2534
2535                 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2536                         grant_lock_pending(r, lkb);
2537                         grant_restart = 1;
2538                         if (count)
2539                                 (*count)++;
2540                         continue;
2541                 }
2542
2543                 if (!demoted && is_demoted(lkb)) {
2544                         log_print("WARN: pending demoted %x node %d %s",
2545                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2546                         demote_restart = 1;
2547                         continue;
2548                 }
2549
2550                 if (deadlk) {
2551                         log_print("WARN: pending deadlock %x node %d %s",
2552                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2553                         dlm_dump_rsb(r);
2554                         continue;
2555                 }
2556
2557                 hi = max_t(int, lkb->lkb_rqmode, hi);
2558
2559                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2560                         *cw = 1;
2561         }
2562
2563         if (grant_restart)
2564                 goto restart;
2565         if (demote_restart && !quit) {
2566                 quit = 1;
2567                 goto restart;
2568         }
2569
2570         return max_t(int, high, hi);
2571 }
2572
2573 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2574                               unsigned int *count)
2575 {
2576         struct dlm_lkb *lkb, *s;
2577
2578         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2579                 if (can_be_granted(r, lkb, 0, 0, NULL)) {
2580                         grant_lock_pending(r, lkb);
2581                         if (count)
2582                                 (*count)++;
2583                 } else {
2584                         high = max_t(int, lkb->lkb_rqmode, high);
2585                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
2586                                 *cw = 1;
2587                 }
2588         }
2589
2590         return high;
2591 }
2592
2593 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2594    on either the convert or waiting queue.
2595    high is the largest rqmode of all locks blocked on the convert or
2596    waiting queue. */
2597
2598 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2599 {
2600         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2601                 if (gr->lkb_highbast < DLM_LOCK_EX)
2602                         return 1;
2603                 return 0;
2604         }
2605
2606         if (gr->lkb_highbast < high &&
2607             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2608                 return 1;
2609         return 0;
2610 }
2611
2612 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2613 {
2614         struct dlm_lkb *lkb, *s;
2615         int high = DLM_LOCK_IV;
2616         int cw = 0;
2617
2618         if (!is_master(r)) {
2619                 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2620                 dlm_dump_rsb(r);
2621                 return;
2622         }
2623
2624         high = grant_pending_convert(r, high, &cw, count);
2625         high = grant_pending_wait(r, high, &cw, count);
2626
2627         if (high == DLM_LOCK_IV)
2628                 return;
2629
2630         /*
2631          * If there are locks left on the wait/convert queue then send blocking
2632          * ASTs to granted locks based on the largest requested mode (high)
2633          * found above.
2634          */
2635
2636         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2637                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2638                         if (cw && high == DLM_LOCK_PR &&
2639                             lkb->lkb_grmode == DLM_LOCK_PR)
2640                                 queue_bast(r, lkb, DLM_LOCK_CW);
2641                         else
2642                                 queue_bast(r, lkb, high);
2643                         lkb->lkb_highbast = high;
2644                 }
2645         }
2646 }
2647
2648 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2649 {
2650         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2651             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2652                 if (gr->lkb_highbast < DLM_LOCK_EX)
2653                         return 1;
2654                 return 0;
2655         }
2656
2657         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2658                 return 1;
2659         return 0;
2660 }
2661
2662 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2663                             struct dlm_lkb *lkb)
2664 {
2665         struct dlm_lkb *gr;
2666
2667         list_for_each_entry(gr, head, lkb_statequeue) {
2668                 /* skip self when sending basts to convertqueue */
2669                 if (gr == lkb)
2670                         continue;
2671                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2672                         queue_bast(r, gr, lkb->lkb_rqmode);
2673                         gr->lkb_highbast = lkb->lkb_rqmode;
2674                 }
2675         }
2676 }
2677
2678 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2679 {
2680         send_bast_queue(r, &r->res_grantqueue, lkb);
2681 }
2682
2683 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2684 {
2685         send_bast_queue(r, &r->res_grantqueue, lkb);
2686         send_bast_queue(r, &r->res_convertqueue, lkb);
2687 }
2688
2689 /* set_master(r, lkb) -- set the master nodeid of a resource
2690
2691    The purpose of this function is to set the nodeid field in the given
2692    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2693    known, it can just be copied to the lkb and the function will return
2694    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2695    before it can be copied to the lkb.
2696
2697    When the rsb nodeid is being looked up remotely, the initial lkb
2698    causing the lookup is kept on the ls_waiters list waiting for the
2699    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2700    on the rsb's res_lookup list until the master is verified.
2701
2702    Return values:
2703    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2704    1: the rsb master is not available and the lkb has been placed on
2705       a wait queue
2706 */
2707
2708 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2709 {
2710         int our_nodeid = dlm_our_nodeid();
2711
2712         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2713                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2714                 r->res_first_lkid = lkb->lkb_id;
2715                 lkb->lkb_nodeid = r->res_nodeid;
2716                 return 0;
2717         }
2718
2719         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2720                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2721                 return 1;
2722         }
2723
2724         if (r->res_master_nodeid == our_nodeid) {
2725                 lkb->lkb_nodeid = 0;
2726                 return 0;
2727         }
2728
2729         if (r->res_master_nodeid) {
2730                 lkb->lkb_nodeid = r->res_master_nodeid;
2731                 return 0;
2732         }
2733
2734         if (dlm_dir_nodeid(r) == our_nodeid) {
2735                 /* This is a somewhat unusual case; find_rsb will usually
2736                    have set res_master_nodeid when dir nodeid is local, but
2737                    there are cases where we become the dir node after we've
2738                    past find_rsb and go through _request_lock again.
2739                    confirm_master() or process_lookup_list() needs to be
2740                    called after this. */
2741                 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2742                           lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2743                           r->res_name);
2744                 r->res_master_nodeid = our_nodeid;
2745                 r->res_nodeid = 0;
2746                 lkb->lkb_nodeid = 0;
2747                 return 0;
2748         }
2749
2750         wait_pending_remove(r);
2751
2752         r->res_first_lkid = lkb->lkb_id;
2753         send_lookup(r, lkb);
2754         return 1;
2755 }
2756
2757 static void process_lookup_list(struct dlm_rsb *r)
2758 {
2759         struct dlm_lkb *lkb, *safe;
2760
2761         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2762                 list_del_init(&lkb->lkb_rsb_lookup);
2763                 _request_lock(r, lkb);
2764                 schedule();
2765         }
2766 }
2767
2768 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2769
2770 static void confirm_master(struct dlm_rsb *r, int error)
2771 {
2772         struct dlm_lkb *lkb;
2773
2774         if (!r->res_first_lkid)
2775                 return;
2776
2777         switch (error) {
2778         case 0:
2779         case -EINPROGRESS:
2780                 r->res_first_lkid = 0;
2781                 process_lookup_list(r);
2782                 break;
2783
2784         case -EAGAIN:
2785         case -EBADR:
2786         case -ENOTBLK:
2787                 /* the remote request failed and won't be retried (it was
2788                    a NOQUEUE, or has been canceled/unlocked); make a waiting
2789                    lkb the first_lkid */
2790
2791                 r->res_first_lkid = 0;
2792
2793                 if (!list_empty(&r->res_lookup)) {
2794                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2795                                          lkb_rsb_lookup);
2796                         list_del_init(&lkb->lkb_rsb_lookup);
2797                         r->res_first_lkid = lkb->lkb_id;
2798                         _request_lock(r, lkb);
2799                 }
2800                 break;
2801
2802         default:
2803                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2804         }
2805 }
2806
2807 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2808                          int namelen, unsigned long timeout_cs,
2809                          void (*ast) (void *astparam),
2810                          void *astparam,
2811                          void (*bast) (void *astparam, int mode),
2812                          struct dlm_args *args)
2813 {
2814         int rv = -EINVAL;
2815
2816         /* check for invalid arg usage */
2817
2818         if (mode < 0 || mode > DLM_LOCK_EX)
2819                 goto out;
2820
2821         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2822                 goto out;
2823
2824         if (flags & DLM_LKF_CANCEL)
2825                 goto out;
2826
2827         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2828                 goto out;
2829
2830         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2831                 goto out;
2832
2833         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2834                 goto out;
2835
2836         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2837                 goto out;
2838
2839         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2840                 goto out;
2841
2842         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2843                 goto out;
2844
2845         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2846                 goto out;
2847
2848         if (!ast || !lksb)
2849                 goto out;
2850
2851         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2852                 goto out;
2853
2854         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2855                 goto out;
2856
2857         /* these args will be copied to the lkb in validate_lock_args,
2858            it cannot be done now because when converting locks, fields in
2859            an active lkb cannot be modified before locking the rsb */
2860
2861         args->flags = flags;
2862         args->astfn = ast;
2863         args->astparam = astparam;
2864         args->bastfn = bast;
2865         args->timeout = timeout_cs;
2866         args->mode = mode;
2867         args->lksb = lksb;
2868         rv = 0;
2869  out:
2870         return rv;
2871 }
2872
2873 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2874 {
2875         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2876                       DLM_LKF_FORCEUNLOCK))
2877                 return -EINVAL;
2878
2879         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2880                 return -EINVAL;
2881
2882         args->flags = flags;
2883         args->astparam = astarg;
2884         return 0;
2885 }
2886
2887 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2888                               struct dlm_args *args)
2889 {
2890         int rv = -EBUSY;
2891
2892         if (args->flags & DLM_LKF_CONVERT) {
2893                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2894                         goto out;
2895
2896                 if (lkb->lkb_wait_type)
2897                         goto out;
2898
2899                 if (is_overlap(lkb))
2900                         goto out;
2901
2902                 rv = -EINVAL;
2903                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2904                         goto out;
2905
2906                 if (args->flags & DLM_LKF_QUECVT &&
2907                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2908                         goto out;
2909         }
2910
2911         lkb->lkb_exflags = args->flags;
2912         lkb->lkb_sbflags = 0;
2913         lkb->lkb_astfn = args->astfn;
2914         lkb->lkb_astparam = args->astparam;
2915         lkb->lkb_bastfn = args->bastfn;
2916         lkb->lkb_rqmode = args->mode;
2917         lkb->lkb_lksb = args->lksb;
2918         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2919         lkb->lkb_ownpid = (int) current->pid;
2920         lkb->lkb_timeout_cs = args->timeout;
2921         rv = 0;
2922  out:
2923         if (rv)
2924                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2925                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2926                           lkb->lkb_status, lkb->lkb_wait_type,
2927                           lkb->lkb_resource->res_name);
2928         return rv;
2929 }
2930
2931 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2932    for success */
2933
2934 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2935    because there may be a lookup in progress and it's valid to do
2936    cancel/unlockf on it */
2937
2938 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2939 {
2940         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2941         int rv = -EINVAL;
2942
2943         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2944                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2945                 dlm_print_lkb(lkb);
2946                 goto out;
2947         }
2948
2949         /* an lkb may still exist even though the lock is EOL'ed due to a
2950            cancel, unlock or failed noqueue request; an app can't use these
2951            locks; return same error as if the lkid had not been found at all */
2952
2953         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2954                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2955                 rv = -ENOENT;
2956                 goto out;
2957         }
2958
2959         /* an lkb may be waiting for an rsb lookup to complete where the
2960            lookup was initiated by another lock */
2961
2962         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2963                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2964                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2965                         list_del_init(&lkb->lkb_rsb_lookup);
2966                         queue_cast(lkb->lkb_resource, lkb,
2967                                    args->flags & DLM_LKF_CANCEL ?
2968                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2969                         unhold_lkb(lkb); /* undoes create_lkb() */
2970                 }
2971                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2972                 rv = -EBUSY;
2973                 goto out;
2974         }
2975
2976         /* cancel not allowed with another cancel/unlock in progress */
2977
2978         if (args->flags & DLM_LKF_CANCEL) {
2979                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2980                         goto out;
2981
2982                 if (is_overlap(lkb))
2983                         goto out;
2984
2985                 /* don't let scand try to do a cancel */
2986                 del_timeout(lkb);
2987
2988                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2989                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2990                         rv = -EBUSY;
2991                         goto out;
2992                 }
2993
2994                 /* there's nothing to cancel */
2995                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2996                     !lkb->lkb_wait_type) {
2997                         rv = -EBUSY;
2998                         goto out;
2999                 }
3000
3001                 switch (lkb->lkb_wait_type) {
3002                 case DLM_MSG_LOOKUP:
3003                 case DLM_MSG_REQUEST:
3004                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
3005                         rv = -EBUSY;
3006                         goto out;
3007                 case DLM_MSG_UNLOCK:
3008                 case DLM_MSG_CANCEL:
3009                         goto out;
3010                 }
3011                 /* add_to_waiters() will set OVERLAP_CANCEL */
3012                 goto out_ok;
3013         }
3014
3015         /* do we need to allow a force-unlock if there's a normal unlock
3016            already in progress?  in what conditions could the normal unlock
3017            fail such that we'd want to send a force-unlock to be sure? */
3018
3019         if (args->flags & DLM_LKF_FORCEUNLOCK) {
3020                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
3021                         goto out;
3022
3023                 if (is_overlap_unlock(lkb))
3024                         goto out;
3025
3026                 /* don't let scand try to do a cancel */
3027                 del_timeout(lkb);
3028
3029                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3030                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3031                         rv = -EBUSY;
3032                         goto out;
3033                 }
3034
3035                 switch (lkb->lkb_wait_type) {
3036                 case DLM_MSG_LOOKUP:
3037                 case DLM_MSG_REQUEST:
3038                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3039                         rv = -EBUSY;
3040                         goto out;
3041                 case DLM_MSG_UNLOCK:
3042                         goto out;
3043                 }
3044                 /* add_to_waiters() will set OVERLAP_UNLOCK */
3045                 goto out_ok;
3046         }
3047
3048         /* normal unlock not allowed if there's any op in progress */
3049         rv = -EBUSY;
3050         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
3051                 goto out;
3052
3053  out_ok:
3054         /* an overlapping op shouldn't blow away exflags from other op */
3055         lkb->lkb_exflags |= args->flags;
3056         lkb->lkb_sbflags = 0;
3057         lkb->lkb_astparam = args->astparam;
3058         rv = 0;
3059  out:
3060         if (rv)
3061                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
3062                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
3063                           args->flags, lkb->lkb_wait_type,
3064                           lkb->lkb_resource->res_name);
3065         return rv;
3066 }
3067
3068 /*
3069  * Four stage 4 varieties:
3070  * do_request(), do_convert(), do_unlock(), do_cancel()
3071  * These are called on the master node for the given lock and
3072  * from the central locking logic.
3073  */
3074
3075 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3076 {
3077         int error = 0;
3078
3079         if (can_be_granted(r, lkb, 1, 0, NULL)) {
3080                 grant_lock(r, lkb);
3081                 queue_cast(r, lkb, 0);
3082                 goto out;
3083         }
3084
3085         if (can_be_queued(lkb)) {
3086                 error = -EINPROGRESS;
3087                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3088                 add_timeout(lkb);
3089                 goto out;
3090         }
3091
3092         error = -EAGAIN;
3093         queue_cast(r, lkb, -EAGAIN);
3094  out:
3095         return error;
3096 }
3097
3098 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3099                                int error)
3100 {
3101         switch (error) {
3102         case -EAGAIN:
3103                 if (force_blocking_asts(lkb))
3104                         send_blocking_asts_all(r, lkb);
3105                 break;
3106         case -EINPROGRESS:
3107                 send_blocking_asts(r, lkb);
3108                 break;
3109         }
3110 }
3111
3112 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3113 {
3114         int error = 0;
3115         int deadlk = 0;
3116
3117         /* changing an existing lock may allow others to be granted */
3118
3119         if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3120                 grant_lock(r, lkb);
3121                 queue_cast(r, lkb, 0);
3122                 goto out;
3123         }
3124
3125         /* can_be_granted() detected that this lock would block in a conversion
3126            deadlock, so we leave it on the granted queue and return EDEADLK in
3127            the ast for the convert. */
3128
3129         if (deadlk) {
3130                 /* it's left on the granted queue */
3131                 revert_lock(r, lkb);
3132                 queue_cast(r, lkb, -EDEADLK);
3133                 error = -EDEADLK;
3134                 goto out;
3135         }
3136
3137         /* is_demoted() means the can_be_granted() above set the grmode
3138            to NL, and left us on the granted queue.  This auto-demotion
3139            (due to CONVDEADLK) might mean other locks, and/or this lock, are
3140            now grantable.  We have to try to grant other converting locks
3141            before we try again to grant this one. */
3142
3143         if (is_demoted(lkb)) {
3144                 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3145                 if (_can_be_granted(r, lkb, 1, 0)) {
3146                         grant_lock(r, lkb);
3147                         queue_cast(r, lkb, 0);
3148                         goto out;
3149                 }
3150                 /* else fall through and move to convert queue */
3151         }
3152
3153         if (can_be_queued(lkb)) {
3154                 error = -EINPROGRESS;
3155                 del_lkb(r, lkb);
3156                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3157                 add_timeout(lkb);
3158                 goto out;
3159         }
3160
3161         error = -EAGAIN;
3162         queue_cast(r, lkb, -EAGAIN);
3163  out:
3164         return error;
3165 }
3166
3167 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3168                                int error)
3169 {
3170         switch (error) {
3171         case 0:
3172                 grant_pending_locks(r, NULL);
3173                 /* grant_pending_locks also sends basts */
3174                 break;
3175         case -EAGAIN:
3176                 if (force_blocking_asts(lkb))
3177                         send_blocking_asts_all(r, lkb);
3178                 break;
3179         case -EINPROGRESS:
3180                 send_blocking_asts(r, lkb);
3181                 break;
3182         }
3183 }
3184
3185 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3186 {
3187         remove_lock(r, lkb);
3188         queue_cast(r, lkb, -DLM_EUNLOCK);
3189         return -DLM_EUNLOCK;
3190 }
3191
3192 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3193                               int error)
3194 {
3195         grant_pending_locks(r, NULL);
3196 }
3197
3198 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3199
3200 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3201 {
3202         int error;
3203
3204         error = revert_lock(r, lkb);
3205         if (error) {
3206                 queue_cast(r, lkb, -DLM_ECANCEL);
3207                 return -DLM_ECANCEL;
3208         }
3209         return 0;
3210 }
3211
3212 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3213                               int error)
3214 {
3215         if (error)
3216                 grant_pending_locks(r, NULL);
3217 }
3218
3219 /*
3220  * Four stage 3 varieties:
3221  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3222  */
3223
3224 /* add a new lkb to a possibly new rsb, called by requesting process */
3225
3226 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3227 {
3228         int error;
3229
3230         /* set_master: sets lkb nodeid from r */
3231
3232         error = set_master(r, lkb);
3233         if (error < 0)
3234                 goto out;
3235         if (error) {
3236                 error = 0;
3237                 goto out;
3238         }
3239
3240         if (is_remote(r)) {
3241                 /* receive_request() calls do_request() on remote node */
3242                 error = send_request(r, lkb);
3243         } else {
3244                 error = do_request(r, lkb);
3245                 /* for remote locks the request_reply is sent
3246                    between do_request and do_request_effects */
3247                 do_request_effects(r, lkb, error);
3248         }
3249  out:
3250         return error;
3251 }
3252
3253 /* change some property of an existing lkb, e.g. mode */
3254
3255 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3256 {
3257         int error;
3258
3259         if (is_remote(r)) {
3260                 /* receive_convert() calls do_convert() on remote node */
3261                 error = send_convert(r, lkb);
3262         } else {
3263                 error = do_convert(r, lkb);
3264                 /* for remote locks the convert_reply is sent
3265                    between do_convert and do_convert_effects */
3266                 do_convert_effects(r, lkb, error);
3267         }
3268
3269         return error;
3270 }
3271
3272 /* remove an existing lkb from the granted queue */
3273
3274 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3275 {
3276         int error;
3277
3278         if (is_remote(r)) {
3279                 /* receive_unlock() calls do_unlock() on remote node */
3280                 error = send_unlock(r, lkb);
3281         } else {
3282                 error = do_unlock(r, lkb);
3283                 /* for remote locks the unlock_reply is sent
3284                    between do_unlock and do_unlock_effects */
3285                 do_unlock_effects(r, lkb, error);
3286         }
3287
3288         return error;
3289 }
3290
3291 /* remove an existing lkb from the convert or wait queue */
3292
3293 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3294 {
3295         int error;
3296
3297         if (is_remote(r)) {
3298                 /* receive_cancel() calls do_cancel() on remote node */
3299                 error = send_cancel(r, lkb);
3300         } else {
3301                 error = do_cancel(r, lkb);
3302                 /* for remote locks the cancel_reply is sent
3303                    between do_cancel and do_cancel_effects */
3304                 do_cancel_effects(r, lkb, error);
3305         }
3306
3307         return error;
3308 }
3309
3310 /*
3311  * Four stage 2 varieties:
3312  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3313  */
3314
3315 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
3316                         int len, struct dlm_args *args)
3317 {
3318         struct dlm_rsb *r;
3319         int error;
3320
3321         error = validate_lock_args(ls, lkb, args);
3322         if (error)
3323                 return error;
3324
3325         error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3326         if (error)
3327                 return error;
3328
3329         lock_rsb(r);
3330
3331         attach_lkb(r, lkb);
3332         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3333
3334         error = _request_lock(r, lkb);
3335
3336         unlock_rsb(r);
3337         put_rsb(r);
3338         return error;
3339 }
3340
3341 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3342                         struct dlm_args *args)
3343 {
3344         struct dlm_rsb *r;
3345         int error;
3346
3347         r = lkb->lkb_resource;
3348
3349         hold_rsb(r);
3350         lock_rsb(r);
3351
3352         error = validate_lock_args(ls, lkb, args);
3353         if (error)
3354                 goto out;
3355
3356         error = _convert_lock(r, lkb);
3357  out:
3358         unlock_rsb(r);
3359         put_rsb(r);
3360         return error;
3361 }
3362
3363 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3364                        struct dlm_args *args)
3365 {
3366         struct dlm_rsb *r;
3367         int error;
3368
3369         r = lkb->lkb_resource;
3370
3371         hold_rsb(r);
3372         lock_rsb(r);
3373
3374         error = validate_unlock_args(lkb, args);
3375         if (error)
3376                 goto out;
3377
3378         error = _unlock_lock(r, lkb);
3379  out:
3380         unlock_rsb(r);
3381         put_rsb(r);
3382         return error;
3383 }
3384
3385 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3386                        struct dlm_args *args)
3387 {
3388         struct dlm_rsb *r;
3389         int error;
3390
3391         r = lkb->lkb_resource;
3392
3393         hold_rsb(r);
3394         lock_rsb(r);
3395
3396         error = validate_unlock_args(lkb, args);
3397         if (error)
3398                 goto out;
3399
3400         error = _cancel_lock(r, lkb);
3401  out:
3402         unlock_rsb(r);
3403         put_rsb(r);
3404         return error;
3405 }
3406
3407 /*
3408  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3409  */
3410
3411 int dlm_lock(dlm_lockspace_t *lockspace,
3412              int mode,
3413              struct dlm_lksb *lksb,
3414              uint32_t flags,
3415              void *name,
3416              unsigned int namelen,
3417              uint32_t parent_lkid,
3418              void (*ast) (void *astarg),
3419              void *astarg,
3420              void (*bast) (void *astarg, int mode))
3421 {
3422         struct dlm_ls *ls;
3423         struct dlm_lkb *lkb;
3424         struct dlm_args args;
3425         int error, convert = flags & DLM_LKF_CONVERT;
3426
3427         ls = dlm_find_lockspace_local(lockspace);
3428         if (!ls)
3429                 return -EINVAL;
3430
3431         dlm_lock_recovery(ls);
3432
3433         if (convert)
3434                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3435         else
3436                 error = create_lkb(ls, &lkb);
3437
3438         if (error)
3439                 goto out;
3440
3441         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3442                               astarg, bast, &args);
3443         if (error)
3444                 goto out_put;
3445
3446         if (convert)
3447                 error = convert_lock(ls, lkb, &args);
3448         else
3449                 error = request_lock(ls, lkb, name, namelen, &args);
3450
3451         if (error == -EINPROGRESS)
3452                 error = 0;
3453  out_put:
3454         if (convert || error)
3455                 __put_lkb(ls, lkb);
3456         if (error == -EAGAIN || error == -EDEADLK)
3457                 error = 0;
3458  out:
3459         dlm_unlock_recovery(ls);
3460         dlm_put_lockspace(ls);
3461         return error;
3462 }
3463
3464 int dlm_unlock(dlm_lockspace_t *lockspace,
3465                uint32_t lkid,
3466                uint32_t flags,
3467                struct dlm_lksb *lksb,
3468                void *astarg)
3469 {
3470         struct dlm_ls *ls;
3471         struct dlm_lkb *lkb;
3472         struct dlm_args args;
3473         int error;
3474
3475         ls = dlm_find_lockspace_local(lockspace);
3476         if (!ls)
3477                 return -EINVAL;
3478
3479         dlm_lock_recovery(ls);
3480
3481         error = find_lkb(ls, lkid, &lkb);
3482         if (error)
3483                 goto out;
3484
3485         error = set_unlock_args(flags, astarg, &args);
3486         if (error)
3487                 goto out_put;
3488
3489         if (flags & DLM_LKF_CANCEL)
3490                 error = cancel_lock(ls, lkb, &args);
3491         else
3492                 error = unlock_lock(ls, lkb, &args);
3493
3494         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3495                 error = 0;
3496         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3497                 error = 0;
3498  out_put:
3499         dlm_put_lkb(lkb);
3500  out:
3501         dlm_unlock_recovery(ls);
3502         dlm_put_lockspace(ls);
3503         return error;
3504 }
3505
3506 /*
3507  * send/receive routines for remote operations and replies
3508  *
3509  * send_args
3510  * send_common
3511  * send_request                 receive_request
3512  * send_convert                 receive_convert
3513  * send_unlock                  receive_unlock
3514  * send_cancel                  receive_cancel
3515  * send_grant                   receive_grant
3516  * send_bast                    receive_bast
3517  * send_lookup                  receive_lookup
3518  * send_remove                  receive_remove
3519  *
3520  *                              send_common_reply
3521  * receive_request_reply        send_request_reply
3522  * receive_convert_reply        send_convert_reply
3523  * receive_unlock_reply         send_unlock_reply
3524  * receive_cancel_reply         send_cancel_reply
3525  * receive_lookup_reply         send_lookup_reply
3526  */
3527
3528 static int _create_message(struct dlm_ls *ls, int mb_len,
3529                            int to_nodeid, int mstype,
3530                            struct dlm_message **ms_ret,
3531                            struct dlm_mhandle **mh_ret)
3532 {
3533         struct dlm_message *ms;
3534         struct dlm_mhandle *mh;
3535         char *mb;
3536
3537         /* get_buffer gives us a message handle (mh) that we need to
3538            pass into lowcomms_commit and a message buffer (mb) that we
3539            write our data into */
3540
3541         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
3542         if (!mh)
3543                 return -ENOBUFS;
3544
3545         memset(mb, 0, mb_len);
3546
3547         ms = (struct dlm_message *) mb;
3548
3549         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3550         ms->m_header.h_lockspace = ls->ls_global_id;
3551         ms->m_header.h_nodeid = dlm_our_nodeid();
3552         ms->m_header.h_length = mb_len;
3553         ms->m_header.h_cmd = DLM_MSG;
3554
3555         ms->m_type = mstype;
3556
3557         *mh_ret = mh;
3558         *ms_ret = ms;
3559         return 0;
3560 }
3561
3562 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3563                           int to_nodeid, int mstype,
3564                           struct dlm_message **ms_ret,
3565                           struct dlm_mhandle **mh_ret)
3566 {
3567         int mb_len = sizeof(struct dlm_message);
3568
3569         switch (mstype) {
3570         case DLM_MSG_REQUEST:
3571         case DLM_MSG_LOOKUP:
3572         case DLM_MSG_REMOVE:
3573                 mb_len += r->res_length;
3574                 break;
3575         case DLM_MSG_CONVERT:
3576         case DLM_MSG_UNLOCK:
3577         case DLM_MSG_REQUEST_REPLY:
3578         case DLM_MSG_CONVERT_REPLY:
3579         case DLM_MSG_GRANT:
3580                 if (lkb && lkb->lkb_lvbptr)
3581                         mb_len += r->res_ls->ls_lvblen;
3582                 break;
3583         }
3584
3585         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3586                                ms_ret, mh_ret);
3587 }
3588
3589 /* further lowcomms enhancements or alternate implementations may make
3590    the return value from this function useful at some point */
3591
3592 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
3593 {
3594         dlm_message_out(ms);
3595         dlm_lowcomms_commit_buffer(mh);
3596         return 0;
3597 }
3598
3599 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3600                       struct dlm_message *ms)
3601 {
3602         ms->m_nodeid   = lkb->lkb_nodeid;
3603         ms->m_pid      = lkb->lkb_ownpid;
3604         ms->m_lkid     = lkb->lkb_id;
3605         ms->m_remid    = lkb->lkb_remid;
3606         ms->m_exflags  = lkb->lkb_exflags;
3607         ms->m_sbflags  = lkb->lkb_sbflags;
3608         ms->m_flags    = lkb->lkb_flags;
3609         ms->m_lvbseq   = lkb->lkb_lvbseq;
3610         ms->m_status   = lkb->lkb_status;
3611         ms->m_grmode   = lkb->lkb_grmode;
3612         ms->m_rqmode   = lkb->lkb_rqmode;
3613         ms->m_hash     = r->res_hash;
3614
3615         /* m_result and m_bastmode are set from function args,
3616            not from lkb fields */
3617
3618         if (lkb->lkb_bastfn)
3619                 ms->m_asts |= DLM_CB_BAST;
3620         if (lkb->lkb_astfn)
3621                 ms->m_asts |= DLM_CB_CAST;
3622
3623         /* compare with switch in create_message; send_remove() doesn't
3624            use send_args() */
3625
3626         switch (ms->m_type) {
3627         case DLM_MSG_REQUEST:
3628         case DLM_MSG_LOOKUP:
3629                 memcpy(ms->m_extra, r->res_name, r->res_length);
3630                 break;
3631         case DLM_MSG_CONVERT:
3632         case DLM_MSG_UNLOCK:
3633         case DLM_MSG_REQUEST_REPLY:
3634         case DLM_MSG_CONVERT_REPLY:
3635         case DLM_MSG_GRANT:
3636                 if (!lkb->lkb_lvbptr)
3637                         break;
3638                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3639                 break;
3640         }
3641 }
3642
3643 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3644 {
3645         struct dlm_message *ms;
3646         struct dlm_mhandle *mh;
3647         int to_nodeid, error;
3648
3649         to_nodeid = r->res_nodeid;
3650
3651         error = add_to_waiters(lkb, mstype, to_nodeid);
3652         if (error)
3653                 return error;
3654
3655         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3656         if (error)
3657                 goto fail;
3658
3659         send_args(r, lkb, ms);
3660
3661         error = send_message(mh, ms);
3662         if (error)
3663                 goto fail;
3664         return 0;
3665
3666  fail:
3667         remove_from_waiters(lkb, msg_reply_type(mstype));
3668         return error;
3669 }
3670
3671 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3672 {
3673         return send_common(r, lkb, DLM_MSG_REQUEST);
3674 }
3675
3676 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3677 {
3678         int error;
3679
3680         error = send_common(r, lkb, DLM_MSG_CONVERT);
3681
3682         /* down conversions go without a reply from the master */
3683         if (!error && down_conversion(lkb)) {
3684                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3685                 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
3686                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3687                 r->res_ls->ls_stub_ms.m_result = 0;
3688                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3689         }
3690
3691         return error;
3692 }
3693
3694 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3695    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3696    that the master is still correct. */
3697
3698 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3699 {
3700         return send_common(r, lkb, DLM_MSG_UNLOCK);
3701 }
3702
3703 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3704 {
3705         return send_common(r, lkb, DLM_MSG_CANCEL);
3706 }
3707
3708 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3709 {
3710         struct dlm_message *ms;
3711         struct dlm_mhandle *mh;
3712         int to_nodeid, error;
3713
3714         to_nodeid = lkb->lkb_nodeid;
3715
3716         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3717         if (error)
3718                 goto out;
3719
3720         send_args(r, lkb, ms);
3721
3722         ms->m_result = 0;
3723
3724         error = send_message(mh, ms);
3725  out:
3726         return error;
3727 }
3728
3729 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3730 {
3731         struct dlm_message *ms;
3732         struct dlm_mhandle *mh;
3733         int to_nodeid, error;
3734
3735         to_nodeid = lkb->lkb_nodeid;
3736
3737         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3738         if (error)
3739                 goto out;
3740
3741         send_args(r, lkb, ms);
3742
3743         ms->m_bastmode = mode;
3744
3745         error = send_message(mh, ms);
3746  out:
3747         return error;
3748 }
3749
3750 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3751 {
3752         struct dlm_message *ms;
3753         struct dlm_mhandle *mh;
3754         int to_nodeid, error;
3755
3756         to_nodeid = dlm_dir_nodeid(r);
3757
3758         error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3759         if (error)
3760                 return error;
3761
3762         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3763         if (error)
3764                 goto fail;
3765
3766         send_args(r, lkb, ms);
3767
3768         error = send_message(mh, ms);
3769         if (error)
3770                 goto fail;
3771         return 0;
3772
3773  fail:
3774         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3775         return error;
3776 }
3777
3778 static int send_remove(struct dlm_rsb *r)
3779 {
3780         struct dlm_message *ms;
3781         struct dlm_mhandle *mh;
3782         int to_nodeid, error;
3783
3784         to_nodeid = dlm_dir_nodeid(r);
3785
3786         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3787         if (error)
3788                 goto out;
3789
3790         memcpy(ms->m_extra, r->res_name, r->res_length);
3791         ms->m_hash = r->res_hash;
3792
3793         error = send_message(mh, ms);
3794  out:
3795         return error;
3796 }
3797
3798 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3799                              int mstype, int rv)
3800 {
3801         struct dlm_message *ms;
3802         struct dlm_mhandle *mh;
3803         int to_nodeid, error;
3804
3805         to_nodeid = lkb->lkb_nodeid;
3806
3807         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3808         if (error)
3809                 goto out;
3810
3811         send_args(r, lkb, ms);
3812
3813         ms->m_result = rv;
3814
3815         error = send_message(mh, ms);
3816  out:
3817         return error;
3818 }
3819
3820 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3821 {
3822         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3823 }
3824
3825 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3826 {
3827         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3828 }
3829
3830 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3831 {
3832         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3833 }
3834
3835 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3836 {
3837         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3838 }
3839
3840 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3841                              int ret_nodeid, int rv)
3842 {
3843         struct dlm_rsb *r = &ls->ls_stub_rsb;
3844         struct dlm_message *ms;
3845         struct dlm_mhandle *mh;
3846         int error, nodeid = ms_in->m_header.h_nodeid;
3847
3848         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3849         if (error)
3850                 goto out;
3851
3852         ms->m_lkid = ms_in->m_lkid;
3853         ms->m_result = rv;
3854         ms->m_nodeid = ret_nodeid;
3855
3856         error = send_message(mh, ms);
3857  out:
3858         return error;
3859 }
3860
3861 /* which args we save from a received message depends heavily on the type
3862    of message, unlike the send side where we can safely send everything about
3863    the lkb for any type of message */
3864
3865 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3866 {
3867         lkb->lkb_exflags = ms->m_exflags;
3868         lkb->lkb_sbflags = ms->m_sbflags;
3869         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3870                          (ms->m_flags & 0x0000FFFF);
3871 }
3872
3873 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3874 {
3875         if (ms->m_flags == DLM_IFL_STUB_MS)
3876                 return;
3877
3878         lkb->lkb_sbflags = ms->m_sbflags;
3879         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3880                          (ms->m_flags & 0x0000FFFF);
3881 }
3882
3883 static int receive_extralen(struct dlm_message *ms)
3884 {
3885         return (ms->m_header.h_length - sizeof(struct dlm_message));
3886 }
3887
3888 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3889                        struct dlm_message *ms)
3890 {
3891         int len;
3892
3893         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3894                 if (!lkb->lkb_lvbptr)
3895                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3896                 if (!lkb->lkb_lvbptr)
3897                         return -ENOMEM;
3898                 len = receive_extralen(ms);
3899                 if (len > ls->ls_lvblen)
3900                         len = ls->ls_lvblen;
3901                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3902         }
3903         return 0;
3904 }
3905
3906 static void fake_bastfn(void *astparam, int mode)
3907 {
3908         log_print("fake_bastfn should not be called");
3909 }
3910
3911 static void fake_astfn(void *astparam)
3912 {
3913         log_print("fake_astfn should not be called");
3914 }
3915
3916 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3917                                 struct dlm_message *ms)
3918 {
3919         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3920         lkb->lkb_ownpid = ms->m_pid;
3921         lkb->lkb_remid = ms->m_lkid;
3922         lkb->lkb_grmode = DLM_LOCK_IV;
3923         lkb->lkb_rqmode = ms->m_rqmode;
3924
3925         lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3926         lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3927
3928         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3929                 /* lkb was just created so there won't be an lvb yet */
3930                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3931                 if (!lkb->lkb_lvbptr)
3932                         return -ENOMEM;
3933         }
3934
3935         return 0;
3936 }
3937
3938 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3939                                 struct dlm_message *ms)
3940 {
3941         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3942                 return -EBUSY;
3943
3944         if (receive_lvb(ls, lkb, ms))
3945                 return -ENOMEM;
3946
3947         lkb->lkb_rqmode = ms->m_rqmode;
3948         lkb->lkb_lvbseq = ms->m_lvbseq;
3949
3950         return 0;
3951 }
3952
3953 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3954                                struct dlm_message *ms)
3955 {
3956         if (receive_lvb(ls, lkb, ms))
3957                 return -ENOMEM;
3958         return 0;
3959 }
3960
3961 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3962    uses to send a reply and that the remote end uses to process the reply. */
3963
3964 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3965 {
3966         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3967         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3968         lkb->lkb_remid = ms->m_lkid;
3969 }
3970
3971 /* This is called after the rsb is locked so that we can safely inspect
3972    fields in the lkb. */
3973
3974 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3975 {
3976         int from = ms->m_header.h_nodeid;
3977         int error = 0;
3978
3979         /* currently mixing of user/kernel locks are not supported */
3980         if (ms->m_flags & DLM_IFL_USER && ~lkb->lkb_flags & DLM_IFL_USER) {
3981                 log_error(lkb->lkb_resource->res_ls,
3982                           "got user dlm message for a kernel lock");
3983                 error = -EINVAL;
3984                 goto out;
3985         }
3986
3987         switch (ms->m_type) {
3988         case DLM_MSG_CONVERT:
3989         case DLM_MSG_UNLOCK:
3990         case DLM_MSG_CANCEL:
3991                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3992                         error = -EINVAL;
3993                 break;
3994
3995         case DLM_MSG_CONVERT_REPLY:
3996         case DLM_MSG_UNLOCK_REPLY:
3997         case DLM_MSG_CANCEL_REPLY:
3998         case DLM_MSG_GRANT:
3999         case DLM_MSG_BAST:
4000                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
4001                         error = -EINVAL;
4002                 break;
4003
4004         case DLM_MSG_REQUEST_REPLY:
4005                 if (!is_process_copy(lkb))
4006                         error = -EINVAL;
4007                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
4008                         error = -EINVAL;
4009                 break;
4010
4011         default:
4012                 error = -EINVAL;
4013         }
4014
4015 out:
4016         if (error)
4017                 log_error(lkb->lkb_resource->res_ls,
4018                           "ignore invalid message %d from %d %x %x %x %d",
4019                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
4020                           lkb->lkb_flags, lkb->lkb_nodeid);
4021         return error;
4022 }
4023
4024 static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
4025 {
4026         char name[DLM_RESNAME_MAXLEN + 1];
4027         struct dlm_message *ms;
4028         struct dlm_mhandle *mh;
4029         struct dlm_rsb *r;
4030         uint32_t hash, b;
4031         int rv, dir_nodeid;
4032
4033         memset(name, 0, sizeof(name));
4034         memcpy(name, ms_name, len);
4035
4036         hash = jhash(name, len, 0);
4037         b = hash & (ls->ls_rsbtbl_size - 1);
4038
4039         dir_nodeid = dlm_hash2nodeid(ls, hash);
4040
4041         log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name);
4042
4043         spin_lock(&ls->ls_rsbtbl[b].lock);
4044         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4045         if (!rv) {
4046                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4047                 log_error(ls, "repeat_remove on keep %s", name);
4048                 return;
4049         }
4050
4051         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4052         if (!rv) {
4053                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4054                 log_error(ls, "repeat_remove on toss %s", name);
4055                 return;
4056         }
4057
4058         /* use ls->remove_name2 to avoid conflict with shrink? */
4059
4060         spin_lock(&ls->ls_remove_spin);
4061         ls->ls_remove_len = len;
4062         memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
4063         spin_unlock(&ls->ls_remove_spin);
4064         spin_unlock(&ls->ls_rsbtbl[b].lock);
4065
4066         rv = _create_message(ls, sizeof(struct dlm_message) + len,
4067                              dir_nodeid, DLM_MSG_REMOVE, &ms, &mh);
4068         if (rv)
4069                 return;
4070
4071         memcpy(ms->m_extra, name, len);
4072         ms->m_hash = hash;
4073
4074         send_message(mh, ms);
4075
4076         spin_lock(&ls->ls_remove_spin);
4077         ls->ls_remove_len = 0;
4078         memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
4079         spin_unlock(&ls->ls_remove_spin);
4080 }
4081
4082 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
4083 {
4084         struct dlm_lkb *lkb;
4085         struct dlm_rsb *r;
4086         int from_nodeid;
4087         int error, namelen = 0;
4088
4089         from_nodeid = ms->m_header.h_nodeid;
4090
4091         error = create_lkb(ls, &lkb);
4092         if (error)
4093                 goto fail;
4094
4095         receive_flags(lkb, ms);
4096         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4097         error = receive_request_args(ls, lkb, ms);
4098         if (error) {
4099                 __put_lkb(ls, lkb);
4100                 goto fail;
4101         }
4102
4103         /* The dir node is the authority on whether we are the master
4104            for this rsb or not, so if the master sends us a request, we should
4105            recreate the rsb if we've destroyed it.   This race happens when we
4106            send a remove message to the dir node at the same time that the dir
4107            node sends us a request for the rsb. */
4108
4109         namelen = receive_extralen(ms);
4110
4111         error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4112                          R_RECEIVE_REQUEST, &r);
4113         if (error) {
4114                 __put_lkb(ls, lkb);
4115                 goto fail;
4116         }
4117
4118         lock_rsb(r);
4119
4120         if (r->res_master_nodeid != dlm_our_nodeid()) {
4121                 error = validate_master_nodeid(ls, r, from_nodeid);
4122                 if (error) {
4123                         unlock_rsb(r);
4124                         put_rsb(r);
4125                         __put_lkb(ls, lkb);
4126                         goto fail;
4127                 }
4128         }
4129
4130         attach_lkb(r, lkb);
4131         error = do_request(r, lkb);
4132         send_request_reply(r, lkb, error);
4133         do_request_effects(r, lkb, error);
4134
4135         unlock_rsb(r);
4136         put_rsb(r);
4137
4138         if (error == -EINPROGRESS)
4139                 error = 0;
4140         if (error)
4141                 dlm_put_lkb(lkb);
4142         return 0;
4143
4144  fail:
4145         /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4146            and do this receive_request again from process_lookup_list once
4147            we get the lookup reply.  This would avoid a many repeated
4148            ENOTBLK request failures when the lookup reply designating us
4149            as master is delayed. */
4150
4151         /* We could repeatedly return -EBADR here if our send_remove() is
4152            delayed in being sent/arriving/being processed on the dir node.
4153            Another node would repeatedly lookup up the master, and the dir
4154            node would continue returning our nodeid until our send_remove
4155            took effect.
4156
4157            We send another remove message in case our previous send_remove
4158            was lost/ignored/missed somehow. */
4159
4160         if (error != -ENOTBLK) {
4161                 log_limit(ls, "receive_request %x from %d %d",
4162                           ms->m_lkid, from_nodeid, error);
4163         }
4164
4165         if (namelen && error == -EBADR) {
4166                 send_repeat_remove(ls, ms->m_extra, namelen);
4167                 msleep(1000);
4168         }
4169
4170         setup_stub_lkb(ls, ms);
4171         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4172         return error;
4173 }
4174
4175 static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
4176 {
4177         struct dlm_lkb *lkb;
4178         struct dlm_rsb *r;
4179         int error, reply = 1;
4180
4181         error = find_lkb(ls, ms->m_remid, &lkb);
4182         if (error)
4183                 goto fail;
4184
4185         if (lkb->lkb_remid != ms->m_lkid) {
4186                 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4187                           "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4188                           (unsigned long long)lkb->lkb_recover_seq,
4189                           ms->m_header.h_nodeid, ms->m_lkid);
4190                 error = -ENOENT;
4191                 dlm_put_lkb(lkb);
4192                 goto fail;
4193         }
4194
4195         r = lkb->lkb_resource;
4196
4197         hold_rsb(r);
4198         lock_rsb(r);
4199
4200         error = validate_message(lkb, ms);
4201         if (error)
4202                 goto out;
4203
4204         receive_flags(lkb, ms);
4205
4206         error = receive_convert_args(ls, lkb, ms);
4207         if (error) {
4208                 send_convert_reply(r, lkb, error);
4209                 goto out;
4210         }
4211
4212         reply = !down_conversion(lkb);
4213
4214         error = do_convert(r, lkb);
4215         if (reply)
4216                 send_convert_reply(r, lkb, error);
4217         do_convert_effects(r, lkb, error);
4218  out:
4219         unlock_rsb(r);
4220         put_rsb(r);
4221         dlm_put_lkb(lkb);
4222         return 0;
4223
4224  fail:
4225         setup_stub_lkb(ls, ms);
4226         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4227         return error;
4228 }
4229
4230 static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
4231 {
4232         struct dlm_lkb *lkb;
4233         struct dlm_rsb *r;
4234         int error;
4235
4236         error = find_lkb(ls, ms->m_remid, &lkb);
4237         if (error)
4238                 goto fail;
4239
4240         if (lkb->lkb_remid != ms->m_lkid) {
4241                 log_error(ls, "receive_unlock %x remid %x remote %d %x",
4242                           lkb->lkb_id, lkb->lkb_remid,
4243                           ms->m_header.h_nodeid, ms->m_lkid);
4244                 error = -ENOENT;
4245                 dlm_put_lkb(lkb);
4246                 goto fail;
4247         }
4248
4249         r = lkb->lkb_resource;
4250
4251         hold_rsb(r);
4252         lock_rsb(r);
4253
4254         error = validate_message(lkb, ms);
4255         if (error)
4256                 goto out;
4257
4258         receive_flags(lkb, ms);
4259
4260         error = receive_unlock_args(ls, lkb, ms);
4261         if (error) {
4262                 send_unlock_reply(r, lkb, error);
4263                 goto out;
4264         }
4265
4266         error = do_unlock(r, lkb);
4267         send_unlock_reply(r, lkb, error);
4268         do_unlock_effects(r, lkb, error);
4269  out:
4270         unlock_rsb(r);
4271         put_rsb(r);
4272         dlm_put_lkb(lkb);
4273         return 0;
4274
4275  fail:
4276         setup_stub_lkb(ls, ms);
4277         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4278         return error;
4279 }
4280
4281 static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
4282 {
4283         struct dlm_lkb *lkb;
4284         struct dlm_rsb *r;
4285         int error;
4286
4287         error = find_lkb(ls, ms->m_remid, &lkb);
4288         if (error)
4289                 goto fail;
4290
4291         receive_flags(lkb, ms);
4292
4293         r = lkb->lkb_resource;
4294
4295         hold_rsb(r);
4296         lock_rsb(r);
4297
4298         error = validate_message(lkb, ms);
4299         if (error)
4300                 goto out;
4301
4302         error = do_cancel(r, lkb);
4303         send_cancel_reply(r, lkb, error);
4304         do_cancel_effects(r, lkb, error);
4305  out:
4306         unlock_rsb(r);
4307         put_rsb(r);
4308         dlm_put_lkb(lkb);
4309         return 0;
4310
4311  fail:
4312         setup_stub_lkb(ls, ms);
4313         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4314         return error;
4315 }
4316
4317 static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
4318 {
4319         struct dlm_lkb *lkb;
4320         struct dlm_rsb *r;
4321         int error;
4322
4323         error = find_lkb(ls, ms->m_remid, &lkb);
4324         if (error)
4325                 return error;
4326
4327         r = lkb->lkb_resource;
4328
4329         hold_rsb(r);
4330         lock_rsb(r);
4331
4332         error = validate_message(lkb, ms);
4333         if (error)
4334                 goto out;
4335
4336         receive_flags_reply(lkb, ms);
4337         if (is_altmode(lkb))
4338                 munge_altmode(lkb, ms);
4339         grant_lock_pc(r, lkb, ms);
4340         queue_cast(r, lkb, 0);
4341  out:
4342         unlock_rsb(r);
4343         put_rsb(r);
4344         dlm_put_lkb(lkb);
4345         return 0;
4346 }
4347
4348 static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
4349 {
4350         struct dlm_lkb *lkb;
4351         struct dlm_rsb *r;
4352         int error;
4353
4354         error = find_lkb(ls, ms->m_remid, &lkb);
4355         if (error)
4356                 return error;
4357
4358         r = lkb->lkb_resource;
4359
4360         hold_rsb(r);
4361         lock_rsb(r);
4362
4363         error = validate_message(lkb, ms);
4364         if (error)
4365                 goto out;
4366
4367         queue_bast(r, lkb, ms->m_bastmode);
4368         lkb->lkb_highbast = ms->m_bastmode;
4369  out:
4370         unlock_rsb(r);
4371         put_rsb(r);
4372         dlm_put_lkb(lkb);
4373         return 0;
4374 }
4375
4376 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
4377 {
4378         int len, error, ret_nodeid, from_nodeid, our_nodeid;
4379
4380         from_nodeid = ms->m_header.h_nodeid;
4381         our_nodeid = dlm_our_nodeid();
4382
4383         len = receive_extralen(ms);
4384
4385         error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4386                                   &ret_nodeid, NULL);
4387
4388         /* Optimization: we're master so treat lookup as a request */
4389         if (!error && ret_nodeid == our_nodeid) {
4390                 receive_request(ls, ms);
4391                 return;
4392         }
4393         send_lookup_reply(ls, ms, ret_nodeid, error);
4394 }
4395
4396 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
4397 {
4398         char name[DLM_RESNAME_MAXLEN+1];
4399         struct dlm_rsb *r;
4400         uint32_t hash, b;
4401         int rv, len, dir_nodeid, from_nodeid;
4402
4403         from_nodeid = ms->m_header.h_nodeid;
4404
4405         len = receive_extralen(ms);
4406
4407         if (len > DLM_RESNAME_MAXLEN) {
4408                 log_error(ls, "receive_remove from %d bad len %d",
4409                           from_nodeid, len);
4410                 return;
4411         }
4412
4413         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
4414         if (dir_nodeid != dlm_our_nodeid()) {
4415                 log_error(ls, "receive_remove from %d bad nodeid %d",
4416                           from_nodeid, dir_nodeid);
4417                 return;
4418         }
4419
4420         /* Look for name on rsbtbl.toss, if it's there, kill it.
4421            If it's on rsbtbl.keep, it's being used, and we should ignore this
4422            message.  This is an expected race between the dir node sending a
4423            request to the master node at the same time as the master node sends
4424            a remove to the dir node.  The resolution to that race is for the
4425            dir node to ignore the remove message, and the master node to
4426            recreate the master rsb when it gets a request from the dir node for
4427            an rsb it doesn't have. */
4428
4429         memset(name, 0, sizeof(name));
4430         memcpy(name, ms->m_extra, len);
4431
4432         hash = jhash(name, len, 0);
4433         b = hash & (ls->ls_rsbtbl_size - 1);
4434
4435         spin_lock(&ls->ls_rsbtbl[b].lock);
4436
4437         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4438         if (rv) {
4439                 /* verify the rsb is on keep list per comment above */
4440                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4441                 if (rv) {
4442                         /* should not happen */
4443                         log_error(ls, "receive_remove from %d not found %s",
4444                                   from_nodeid, name);
4445                         spin_unlock(&ls->ls_rsbtbl[b].lock);
4446                         return;
4447                 }
4448                 if (r->res_master_nodeid != from_nodeid) {
4449                         /* should not happen */
4450                         log_error(ls, "receive_remove keep from %d master %d",
4451                                   from_nodeid, r->res_master_nodeid);
4452                         dlm_print_rsb(r);
4453                         spin_unlock(&ls->ls_rsbtbl[b].lock);
4454                         return;
4455                 }
4456
4457                 log_debug(ls, "receive_remove from %d master %d first %x %s",
4458                           from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4459                           name);
4460                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4461                 return;
4462         }
4463
4464         if (r->res_master_nodeid != from_nodeid) {
4465                 log_error(ls, "receive_remove toss from %d master %d",
4466                           from_nodeid, r->res_master_nodeid);
4467                 dlm_print_rsb(r);
4468                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4469                 return;
4470         }
4471
4472         if (kref_put(&r->res_ref, kill_rsb)) {
4473                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4474                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4475                 dlm_free_rsb(r);
4476         } else {
4477                 log_error(ls, "receive_remove from %d rsb ref error",
4478                           from_nodeid);
4479                 dlm_print_rsb(r);
4480                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4481         }
4482 }
4483
4484 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
4485 {
4486         do_purge(ls, ms->m_nodeid, ms->m_pid);
4487 }
4488
4489 static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
4490 {
4491         struct dlm_lkb *lkb;
4492         struct dlm_rsb *r;
4493         int error, mstype, result;
4494         int from_nodeid = ms->m_header.h_nodeid;
4495
4496         error = find_lkb(ls, ms->m_remid, &lkb);
4497         if (error)
4498                 return error;
4499
4500         r = lkb->lkb_resource;
4501         hold_rsb(r);
4502         lock_rsb(r);
4503
4504         error = validate_message(lkb, ms);
4505         if (error)
4506                 goto out;
4507
4508         mstype = lkb->lkb_wait_type;
4509         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4510         if (error) {
4511                 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4512                           lkb->lkb_id, from_nodeid, ms->m_lkid, ms->m_result);
4513                 dlm_dump_rsb(r);
4514                 goto out;
4515         }
4516
4517         /* Optimization: the dir node was also the master, so it took our
4518            lookup as a request and sent request reply instead of lookup reply */
4519         if (mstype == DLM_MSG_LOOKUP) {
4520                 r->res_master_nodeid = from_nodeid;
4521                 r->res_nodeid = from_nodeid;
4522                 lkb->lkb_nodeid = from_nodeid;
4523         }
4524
4525         /* this is the value returned from do_request() on the master */
4526         result = ms->m_result;
4527
4528         switch (result) {
4529         case -EAGAIN:
4530                 /* request would block (be queued) on remote master */
4531                 queue_cast(r, lkb, -EAGAIN);
4532                 confirm_master(r, -EAGAIN);
4533                 unhold_lkb(lkb); /* undoes create_lkb() */
4534                 break;
4535
4536         case -EINPROGRESS:
4537         case 0:
4538                 /* request was queued or granted on remote master */
4539                 receive_flags_reply(lkb, ms);
4540                 lkb->lkb_remid = ms->m_lkid;
4541                 if (is_altmode(lkb))
4542                         munge_altmode(lkb, ms);
4543                 if (result) {
4544                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
4545                         add_timeout(lkb);
4546                 } else {
4547                         grant_lock_pc(r, lkb, ms);
4548                         queue_cast(r, lkb, 0);
4549                 }
4550                 confirm_master(r, result);
4551                 break;
4552
4553         case -EBADR:
4554         case -ENOTBLK:
4555                 /* find_rsb failed to find rsb or rsb wasn't master */
4556                 log_limit(ls, "receive_request_reply %x from %d %d "
4557                           "master %d dir %d first %x %s", lkb->lkb_id,
4558                           from_nodeid, result, r->res_master_nodeid,
4559                           r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4560
4561                 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4562                     r->res_master_nodeid != dlm_our_nodeid()) {
4563                         /* cause _request_lock->set_master->send_lookup */
4564                         r->res_master_nodeid = 0;
4565                         r->res_nodeid = -1;
4566                         lkb->lkb_nodeid = -1;
4567                 }
4568
4569                 if (is_overlap(lkb)) {
4570                         /* we'll ignore error in cancel/unlock reply */
4571                         queue_cast_overlap(r, lkb);
4572                         confirm_master(r, result);
4573                         unhold_lkb(lkb); /* undoes create_lkb() */
4574                 } else {
4575                         _request_lock(r, lkb);
4576
4577                         if (r->res_master_nodeid == dlm_our_nodeid())
4578                                 confirm_master(r, 0);
4579                 }
4580                 break;
4581
4582         default:
4583                 log_error(ls, "receive_request_reply %x error %d",
4584                           lkb->lkb_id, result);
4585         }
4586
4587         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
4588                 log_debug(ls, "receive_request_reply %x result %d unlock",
4589                           lkb->lkb_id, result);
4590                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4591                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4592                 send_unlock(r, lkb);
4593         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
4594                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4595                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4596                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4597                 send_cancel(r, lkb);
4598         } else {
4599                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4600                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4601         }
4602  out:
4603         unlock_rsb(r);
4604         put_rsb(r);
4605         dlm_put_lkb(lkb);
4606         return 0;
4607 }
4608
4609 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4610                                     struct dlm_message *ms)
4611 {
4612         /* this is the value returned from do_convert() on the master */
4613         switch (ms->m_result) {
4614         case -EAGAIN:
4615                 /* convert would block (be queued) on remote master */
4616                 queue_cast(r, lkb, -EAGAIN);
4617                 break;
4618
4619         case -EDEADLK:
4620                 receive_flags_reply(lkb, ms);
4621                 revert_lock_pc(r, lkb);
4622                 queue_cast(r, lkb, -EDEADLK);
4623                 break;
4624
4625         case -EINPROGRESS:
4626                 /* convert was queued on remote master */
4627                 receive_flags_reply(lkb, ms);
4628                 if (is_demoted(lkb))
4629                         munge_demoted(lkb);
4630                 del_lkb(r, lkb);
4631                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4632                 add_timeout(lkb);
4633                 break;
4634
4635         case 0:
4636                 /* convert was granted on remote master */
4637                 receive_flags_reply(lkb, ms);
4638                 if (is_demoted(lkb))
4639                         munge_demoted(lkb);
4640                 grant_lock_pc(r, lkb, ms);
4641                 queue_cast(r, lkb, 0);
4642                 break;
4643
4644         default:
4645                 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4646                           lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
4647                           ms->m_result);
4648                 dlm_print_rsb(r);
4649                 dlm_print_lkb(lkb);
4650         }
4651 }
4652
4653 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4654 {
4655         struct dlm_rsb *r = lkb->lkb_resource;
4656         int error;
4657
4658         hold_rsb(r);
4659         lock_rsb(r);
4660
4661         error = validate_message(lkb, ms);
4662         if (error)
4663                 goto out;
4664
4665         /* stub reply can happen with waiters_mutex held */
4666         error = remove_from_waiters_ms(lkb, ms);
4667         if (error)
4668                 goto out;
4669
4670         __receive_convert_reply(r, lkb, ms);
4671  out:
4672         unlock_rsb(r);
4673         put_rsb(r);
4674 }
4675
4676 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
4677 {
4678         struct dlm_lkb *lkb;
4679         int error;
4680
4681         error = find_lkb(ls, ms->m_remid, &lkb);
4682         if (error)
4683                 return error;
4684
4685         _receive_convert_reply(lkb, ms);
4686         dlm_put_lkb(lkb);
4687         return 0;
4688 }
4689
4690 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4691 {
4692         struct dlm_rsb *r = lkb->lkb_resource;
4693         int error;
4694
4695         hold_rsb(r);
4696         lock_rsb(r);
4697
4698         error = validate_message(lkb, ms);
4699         if (error)
4700                 goto out;
4701
4702         /* stub reply can happen with waiters_mutex held */
4703         error = remove_from_waiters_ms(lkb, ms);
4704         if (error)
4705                 goto out;
4706
4707         /* this is the value returned from do_unlock() on the master */
4708
4709         switch (ms->m_result) {
4710         case -DLM_EUNLOCK:
4711                 receive_flags_reply(lkb, ms);
4712                 remove_lock_pc(r, lkb);
4713                 queue_cast(r, lkb, -DLM_EUNLOCK);
4714                 break;
4715         case -ENOENT:
4716                 break;
4717         default:
4718                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4719                           lkb->lkb_id, ms->m_result);
4720         }
4721  out:
4722         unlock_rsb(r);
4723         put_rsb(r);
4724 }
4725
4726 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
4727 {
4728         struct dlm_lkb *lkb;
4729         int error;
4730
4731         error = find_lkb(ls, ms->m_remid, &lkb);
4732         if (error)
4733                 return error;
4734
4735         _receive_unlock_reply(lkb, ms);
4736         dlm_put_lkb(lkb);
4737         return 0;
4738 }
4739
4740 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4741 {
4742         struct dlm_rsb *r = lkb->lkb_resource;
4743         int error;
4744
4745         hold_rsb(r);
4746         lock_rsb(r);
4747
4748         error = validate_message(lkb, ms);
4749         if (error)
4750                 goto out;
4751
4752         /* stub reply can happen with waiters_mutex held */
4753         error = remove_from_waiters_ms(lkb, ms);
4754         if (error)
4755                 goto out;
4756
4757         /* this is the value returned from do_cancel() on the master */
4758
4759         switch (ms->m_result) {
4760         case -DLM_ECANCEL:
4761                 receive_flags_reply(lkb, ms);
4762                 revert_lock_pc(r, lkb);
4763                 queue_cast(r, lkb, -DLM_ECANCEL);
4764                 break;
4765         case 0:
4766                 break;
4767         default:
4768                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
4769                           lkb->lkb_id, ms->m_result);
4770         }
4771  out:
4772         unlock_rsb(r);
4773         put_rsb(r);
4774 }
4775
4776 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
4777 {
4778         struct dlm_lkb *lkb;
4779         int error;
4780
4781         error = find_lkb(ls, ms->m_remid, &lkb);
4782         if (error)
4783                 return error;
4784
4785         _receive_cancel_reply(lkb, ms);
4786         dlm_put_lkb(lkb);
4787         return 0;
4788 }
4789
4790 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4791 {
4792         struct dlm_lkb *lkb;
4793         struct dlm_rsb *r;
4794         int error, ret_nodeid;
4795         int do_lookup_list = 0;
4796
4797         error = find_lkb(ls, ms->m_lkid, &lkb);
4798         if (error) {
4799                 log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid);
4800                 return;
4801         }
4802
4803         /* ms->m_result is the value returned by dlm_master_lookup on dir node
4804            FIXME: will a non-zero error ever be returned? */
4805
4806         r = lkb->lkb_resource;
4807         hold_rsb(r);
4808         lock_rsb(r);
4809
4810         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4811         if (error)
4812                 goto out;
4813
4814         ret_nodeid = ms->m_nodeid;
4815
4816         /* We sometimes receive a request from the dir node for this
4817            rsb before we've received the dir node's loookup_reply for it.
4818            The request from the dir node implies we're the master, so we set
4819            ourself as master in receive_request_reply, and verify here that
4820            we are indeed the master. */
4821
4822         if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4823                 /* This should never happen */
4824                 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4825                           "master %d dir %d our %d first %x %s",
4826                           lkb->lkb_id, ms->m_header.h_nodeid, ret_nodeid,
4827                           r->res_master_nodeid, r->res_dir_nodeid,
4828                           dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4829         }
4830
4831         if (ret_nodeid == dlm_our_nodeid()) {
4832                 r->res_master_nodeid = ret_nodeid;
4833                 r->res_nodeid = 0;
4834                 do_lookup_list = 1;
4835                 r->res_first_lkid = 0;
4836         } else if (ret_nodeid == -1) {
4837                 /* the remote node doesn't believe it's the dir node */
4838                 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4839                           lkb->lkb_id, ms->m_header.h_nodeid);
4840                 r->res_master_nodeid = 0;
4841                 r->res_nodeid = -1;
4842                 lkb->lkb_nodeid = -1;
4843         } else {
4844                 /* set_master() will set lkb_nodeid from r */
4845                 r->res_master_nodeid = ret_nodeid;
4846                 r->res_nodeid = ret_nodeid;
4847         }
4848
4849         if (is_overlap(lkb)) {
4850                 log_debug(ls, "receive_lookup_reply %x unlock %x",
4851                           lkb->lkb_id, lkb->lkb_flags);
4852                 queue_cast_overlap(r, lkb);
4853                 unhold_lkb(lkb); /* undoes create_lkb() */
4854                 goto out_list;
4855         }
4856
4857         _request_lock(r, lkb);
4858
4859  out_list:
4860         if (do_lookup_list)
4861                 process_lookup_list(r);
4862  out:
4863         unlock_rsb(r);
4864         put_rsb(r);
4865         dlm_put_lkb(lkb);
4866 }
4867
4868 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4869                              uint32_t saved_seq)
4870 {
4871         int error = 0, noent = 0;
4872
4873         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
4874                 log_limit(ls, "receive %d from non-member %d %x %x %d",
4875                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
4876                           ms->m_remid, ms->m_result);
4877                 return;
4878         }
4879
4880         switch (ms->m_type) {
4881
4882         /* messages sent to a master node */
4883
4884         case DLM_MSG_REQUEST:
4885                 error = receive_request(ls, ms);
4886                 break;
4887
4888         case DLM_MSG_CONVERT:
4889                 error = receive_convert(ls, ms);
4890                 break;
4891
4892         case DLM_MSG_UNLOCK:
4893                 error = receive_unlock(ls, ms);
4894                 break;
4895
4896         case DLM_MSG_CANCEL:
4897                 noent = 1;
4898                 error = receive_cancel(ls, ms);
4899                 break;
4900
4901         /* messages sent from a master node (replies to above) */
4902
4903         case DLM_MSG_REQUEST_REPLY:
4904                 error = receive_request_reply(ls, ms);
4905                 break;
4906
4907         case DLM_MSG_CONVERT_REPLY:
4908                 error = receive_convert_reply(ls, ms);
4909                 break;
4910
4911         case DLM_MSG_UNLOCK_REPLY:
4912                 error = receive_unlock_reply(ls, ms);
4913                 break;
4914
4915         case DLM_MSG_CANCEL_REPLY:
4916                 error = receive_cancel_reply(ls, ms);
4917                 break;
4918
4919         /* messages sent from a master node (only two types of async msg) */
4920
4921         case DLM_MSG_GRANT:
4922                 noent = 1;
4923                 error = receive_grant(ls, ms);
4924                 break;
4925
4926         case DLM_MSG_BAST:
4927                 noent = 1;
4928                 error = receive_bast(ls, ms);
4929                 break;
4930
4931         /* messages sent to a dir node */
4932
4933         case DLM_MSG_LOOKUP:
4934                 receive_lookup(ls, ms);
4935                 break;
4936
4937         case DLM_MSG_REMOVE:
4938                 receive_remove(ls, ms);
4939                 break;
4940
4941         /* messages sent from a dir node (remove has no reply) */
4942
4943         case DLM_MSG_LOOKUP_REPLY:
4944                 receive_lookup_reply(ls, ms);
4945                 break;
4946
4947         /* other messages */
4948
4949         case DLM_MSG_PURGE:
4950                 receive_purge(ls, ms);
4951                 break;
4952
4953         default:
4954                 log_error(ls, "unknown message type %d", ms->m_type);
4955         }
4956
4957         /*
4958          * When checking for ENOENT, we're checking the result of
4959          * find_lkb(m_remid):
4960          *
4961          * The lock id referenced in the message wasn't found.  This may
4962          * happen in normal usage for the async messages and cancel, so
4963          * only use log_debug for them.
4964          *
4965          * Some errors are expected and normal.
4966          */
4967
4968         if (error == -ENOENT && noent) {
4969                 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4970                           ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4971                           ms->m_lkid, saved_seq);
4972         } else if (error == -ENOENT) {
4973                 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4974                           ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4975                           ms->m_lkid, saved_seq);
4976
4977                 if (ms->m_type == DLM_MSG_CONVERT)
4978                         dlm_dump_rsb_hash(ls, ms->m_hash);
4979         }
4980
4981         if (error == -EINVAL) {
4982                 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4983                           "saved_seq %u",
4984                           ms->m_type, ms->m_header.h_nodeid,
4985                           ms->m_lkid, ms->m_remid, saved_seq);
4986         }
4987 }
4988
4989 /* If the lockspace is in recovery mode (locking stopped), then normal
4990    messages are saved on the requestqueue for processing after recovery is
4991    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4992    messages off the requestqueue before we process new ones. This occurs right
4993    after recovery completes when we transition from saving all messages on
4994    requestqueue, to processing all the saved messages, to processing new
4995    messages as they arrive. */
4996
4997 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4998                                 int nodeid)
4999 {
5000         if (dlm_locking_stopped(ls)) {
5001                 /* If we were a member of this lockspace, left, and rejoined,
5002                    other nodes may still be sending us messages from the
5003                    lockspace generation before we left. */
5004                 if (!ls->ls_generation) {
5005                         log_limit(ls, "receive %d from %d ignore old gen",
5006                                   ms->m_type, nodeid);
5007                         return;
5008                 }
5009
5010                 dlm_add_requestqueue(ls, nodeid, ms);
5011         } else {
5012                 dlm_wait_requestqueue(ls);
5013                 _receive_message(ls, ms, 0);
5014         }
5015 }
5016
5017 /* This is called by dlm_recoverd to process messages that were saved on
5018    the requestqueue. */
5019
5020 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
5021                                uint32_t saved_seq)
5022 {
5023         _receive_message(ls, ms, saved_seq);
5024 }
5025
5026 /* This is called by the midcomms layer when something is received for
5027    the lockspace.  It could be either a MSG (normal message sent as part of
5028    standard locking activity) or an RCOM (recovery message sent as part of
5029    lockspace recovery). */
5030
5031 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
5032 {
5033         struct dlm_header *hd = &p->header;
5034         struct dlm_ls *ls;
5035         int type = 0;
5036
5037         switch (hd->h_cmd) {
5038         case DLM_MSG:
5039                 dlm_message_in(&p->message);
5040                 type = p->message.m_type;
5041                 break;
5042         case DLM_RCOM:
5043                 dlm_rcom_in(&p->rcom);
5044                 type = p->rcom.rc_type;
5045                 break;
5046         default:
5047                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
5048                 return;
5049         }
5050
5051         if (hd->h_nodeid != nodeid) {
5052                 log_print("invalid h_nodeid %d from %d lockspace %x",
5053                           hd->h_nodeid, nodeid, hd->h_lockspace);
5054                 return;
5055         }
5056
5057         ls = dlm_find_lockspace_global(hd->h_lockspace);
5058         if (!ls) {
5059                 if (dlm_config.ci_log_debug) {
5060                         printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
5061                                 "%u from %d cmd %d type %d\n",
5062                                 hd->h_lockspace, nodeid, hd->h_cmd, type);
5063                 }
5064
5065                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
5066                         dlm_send_ls_not_ready(nodeid, &p->rcom);
5067                 return;
5068         }
5069
5070         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
5071            be inactive (in this ls) before transitioning to recovery mode */
5072
5073         down_read(&ls->ls_recv_active);
5074         if (hd->h_cmd == DLM_MSG)
5075                 dlm_receive_message(ls, &p->message, nodeid);
5076         else
5077                 dlm_receive_rcom(ls, &p->rcom, nodeid);
5078         up_read(&ls->ls_recv_active);
5079
5080         dlm_put_lockspace(ls);
5081 }
5082
5083 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5084                                    struct dlm_message *ms_stub)
5085 {
5086         if (middle_conversion(lkb)) {
5087                 hold_lkb(lkb);
5088                 memset(ms_stub, 0, sizeof(struct dlm_message));
5089                 ms_stub->m_flags = DLM_IFL_STUB_MS;
5090                 ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
5091                 ms_stub->m_result = -EINPROGRESS;
5092                 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5093                 _receive_convert_reply(lkb, ms_stub);
5094
5095                 /* Same special case as in receive_rcom_lock_args() */
5096                 lkb->lkb_grmode = DLM_LOCK_IV;
5097                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
5098                 unhold_lkb(lkb);
5099
5100         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5101                 lkb->lkb_flags |= DLM_IFL_RESEND;
5102         }
5103
5104         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5105            conversions are async; there's no reply from the remote master */
5106 }
5107
5108 /* A waiting lkb needs recovery if the master node has failed, or
5109    the master node is changing (only when no directory is used) */
5110
5111 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5112                                  int dir_nodeid)
5113 {
5114         if (dlm_no_directory(ls))
5115                 return 1;
5116
5117         if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5118                 return 1;
5119
5120         return 0;
5121 }
5122
5123 /* Recovery for locks that are waiting for replies from nodes that are now
5124    gone.  We can just complete unlocks and cancels by faking a reply from the
5125    dead node.  Requests and up-conversions we flag to be resent after
5126    recovery.  Down-conversions can just be completed with a fake reply like
5127    unlocks.  Conversions between PR and CW need special attention. */
5128
5129 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5130 {
5131         struct dlm_lkb *lkb, *safe;
5132         struct dlm_message *ms_stub;
5133         int wait_type, stub_unlock_result, stub_cancel_result;
5134         int dir_nodeid;
5135
5136         ms_stub = kmalloc(sizeof(*ms_stub), GFP_KERNEL);
5137         if (!ms_stub)
5138                 return;
5139
5140         mutex_lock(&ls->ls_waiters_mutex);
5141
5142         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5143
5144                 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5145
5146                 /* exclude debug messages about unlocks because there can be so
5147                    many and they aren't very interesting */
5148
5149                 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5150                         log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5151                                   "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5152                                   lkb->lkb_id,
5153                                   lkb->lkb_remid,
5154                                   lkb->lkb_wait_type,
5155                                   lkb->lkb_resource->res_nodeid,
5156                                   lkb->lkb_nodeid,
5157                                   lkb->lkb_wait_nodeid,
5158                                   dir_nodeid);
5159                 }
5160
5161                 /* all outstanding lookups, regardless of destination  will be
5162                    resent after recovery is done */
5163
5164                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5165                         lkb->lkb_flags |= DLM_IFL_RESEND;
5166                         continue;
5167                 }
5168
5169                 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5170                         continue;
5171
5172                 wait_type = lkb->lkb_wait_type;
5173                 stub_unlock_result = -DLM_EUNLOCK;
5174                 stub_cancel_result = -DLM_ECANCEL;
5175
5176                 /* Main reply may have been received leaving a zero wait_type,
5177                    but a reply for the overlapping op may not have been
5178                    received.  In that case we need to fake the appropriate
5179                    reply for the overlap op. */
5180
5181                 if (!wait_type) {
5182                         if (is_overlap_cancel(lkb)) {
5183                                 wait_type = DLM_MSG_CANCEL;
5184                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
5185                                         stub_cancel_result = 0;
5186                         }
5187                         if (is_overlap_unlock(lkb)) {
5188                                 wait_type = DLM_MSG_UNLOCK;
5189                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
5190                                         stub_unlock_result = -ENOENT;
5191                         }
5192
5193                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
5194                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
5195                                   stub_cancel_result, stub_unlock_result);
5196                 }
5197
5198                 switch (wait_type) {
5199
5200                 case DLM_MSG_REQUEST:
5201                         lkb->lkb_flags |= DLM_IFL_RESEND;
5202                         break;
5203
5204                 case DLM_MSG_CONVERT:
5205                         recover_convert_waiter(ls, lkb, ms_stub);
5206                         break;
5207
5208                 case DLM_MSG_UNLOCK:
5209                         hold_lkb(lkb);
5210                         memset(ms_stub, 0, sizeof(struct dlm_message));
5211                         ms_stub->m_flags = DLM_IFL_STUB_MS;
5212                         ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
5213                         ms_stub->m_result = stub_unlock_result;
5214                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5215                         _receive_unlock_reply(lkb, ms_stub);
5216                         dlm_put_lkb(lkb);
5217                         break;
5218
5219                 case DLM_MSG_CANCEL:
5220                         hold_lkb(lkb);
5221                         memset(ms_stub, 0, sizeof(struct dlm_message));
5222                         ms_stub->m_flags = DLM_IFL_STUB_MS;
5223                         ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
5224                         ms_stub->m_result = stub_cancel_result;
5225                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5226                         _receive_cancel_reply(lkb, ms_stub);
5227                         dlm_put_lkb(lkb);
5228                         break;
5229
5230                 default:
5231                         log_error(ls, "invalid lkb wait_type %d %d",
5232                                   lkb->lkb_wait_type, wait_type);
5233                 }
5234                 schedule();
5235         }
5236         mutex_unlock(&ls->ls_waiters_mutex);
5237         kfree(ms_stub);
5238 }
5239
5240 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5241 {
5242         struct dlm_lkb *lkb;
5243         int found = 0;
5244
5245         mutex_lock(&ls->ls_waiters_mutex);
5246         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
5247                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
5248                         hold_lkb(lkb);
5249                         found = 1;
5250                         break;
5251                 }
5252         }
5253         mutex_unlock(&ls->ls_waiters_mutex);
5254
5255         if (!found)
5256                 lkb = NULL;
5257         return lkb;
5258 }
5259
5260 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
5261    master or dir-node for r.  Processing the lkb may result in it being placed
5262    back on waiters. */
5263
5264 /* We do this after normal locking has been enabled and any saved messages
5265    (in requestqueue) have been processed.  We should be confident that at
5266    this point we won't get or process a reply to any of these waiting
5267    operations.  But, new ops may be coming in on the rsbs/locks here from
5268    userspace or remotely. */
5269
5270 /* there may have been an overlap unlock/cancel prior to recovery or after
5271    recovery.  if before, the lkb may still have a pos wait_count; if after, the
5272    overlap flag would just have been set and nothing new sent.  we can be
5273    confident here than any replies to either the initial op or overlap ops
5274    prior to recovery have been received. */
5275
5276 int dlm_recover_waiters_post(struct dlm_ls *ls)
5277 {
5278         struct dlm_lkb *lkb;
5279         struct dlm_rsb *r;
5280         int error = 0, mstype, err, oc, ou;
5281
5282         while (1) {
5283                 if (dlm_locking_stopped(ls)) {
5284                         log_debug(ls, "recover_waiters_post aborted");
5285                         error = -EINTR;
5286                         break;
5287                 }
5288
5289                 lkb = find_resend_waiter(ls);
5290                 if (!lkb)
5291                         break;
5292
5293                 r = lkb->lkb_resource;
5294                 hold_rsb(r);
5295                 lock_rsb(r);
5296
5297                 mstype = lkb->lkb_wait_type;
5298                 oc = is_overlap_cancel(lkb);
5299                 ou = is_overlap_unlock(lkb);
5300                 err = 0;
5301
5302                 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5303                           "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5304                           "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5305                           r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5306                           dlm_dir_nodeid(r), oc, ou);
5307
5308                 /* At this point we assume that we won't get a reply to any
5309                    previous op or overlap op on this lock.  First, do a big
5310                    remove_from_waiters() for all previous ops. */
5311
5312                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
5313                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
5314                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
5315                 lkb->lkb_wait_type = 0;
5316                 /* drop all wait_count references we still
5317                  * hold a reference for this iteration.
5318                  */
5319                 while (lkb->lkb_wait_count) {
5320                         lkb->lkb_wait_count--;
5321                         unhold_lkb(lkb);
5322                 }
5323                 mutex_lock(&ls->ls_waiters_mutex);
5324                 list_del_init(&lkb->lkb_wait_reply);
5325                 mutex_unlock(&ls->ls_waiters_mutex);
5326
5327                 if (oc || ou) {
5328                         /* do an unlock or cancel instead of resending */
5329                         switch (mstype) {
5330                         case DLM_MSG_LOOKUP:
5331                         case DLM_MSG_REQUEST:
5332                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5333                                                         -DLM_ECANCEL);
5334                                 unhold_lkb(lkb); /* undoes create_lkb() */
5335                                 break;
5336                         case DLM_MSG_CONVERT:
5337                                 if (oc) {
5338                                         queue_cast(r, lkb, -DLM_ECANCEL);
5339                                 } else {
5340                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5341                                         _unlock_lock(r, lkb);
5342                                 }
5343                                 break;
5344                         default:
5345                                 err = 1;
5346                         }
5347                 } else {
5348                         switch (mstype) {
5349                         case DLM_MSG_LOOKUP:
5350                         case DLM_MSG_REQUEST:
5351                                 _request_lock(r, lkb);
5352                                 if (is_master(r))
5353                                         confirm_master(r, 0);
5354                                 break;
5355                         case DLM_MSG_CONVERT:
5356                                 _convert_lock(r, lkb);
5357                                 break;
5358                         default:
5359                                 err = 1;
5360                         }
5361                 }
5362
5363                 if (err) {
5364                         log_error(ls, "waiter %x msg %d r_nodeid %d "
5365                                   "dir_nodeid %d overlap %d %d",
5366                                   lkb->lkb_id, mstype, r->res_nodeid,
5367                                   dlm_dir_nodeid(r), oc, ou);
5368                 }
5369                 unlock_rsb(r);
5370                 put_rsb(r);
5371                 dlm_put_lkb(lkb);
5372         }
5373
5374         return error;
5375 }
5376
5377 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5378                               struct list_head *list)
5379 {
5380         struct dlm_lkb *lkb, *safe;
5381
5382         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5383                 if (!is_master_copy(lkb))
5384                         continue;
5385
5386                 /* don't purge lkbs we've added in recover_master_copy for
5387                    the current recovery seq */
5388
5389                 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5390                         continue;
5391
5392                 del_lkb(r, lkb);
5393
5394                 /* this put should free the lkb */
5395                 if (!dlm_put_lkb(lkb))
5396                         log_error(ls, "purged mstcpy lkb not released");
5397         }
5398 }
5399
5400 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5401 {
5402         struct dlm_ls *ls = r->res_ls;
5403
5404         purge_mstcpy_list(ls, r, &r->res_grantqueue);
5405         purge_mstcpy_list(ls, r, &r->res_convertqueue);
5406         purge_mstcpy_list(ls, r, &r->res_waitqueue);
5407 }
5408
5409 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5410                             struct list_head *list,
5411                             int nodeid_gone, unsigned int *count)
5412 {
5413         struct dlm_lkb *lkb, *safe;
5414
5415         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5416                 if (!is_master_copy(lkb))
5417                         continue;
5418
5419                 if ((lkb->lkb_nodeid == nodeid_gone) ||
5420                     dlm_is_removed(ls, lkb->lkb_nodeid)) {
5421
5422                         /* tell recover_lvb to invalidate the lvb
5423                            because a node holding EX/PW failed */
5424                         if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5425                             (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5426                                 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5427                         }
5428
5429                         del_lkb(r, lkb);
5430
5431                         /* this put should free the lkb */
5432                         if (!dlm_put_lkb(lkb))
5433                                 log_error(ls, "purged dead lkb not released");
5434
5435                         rsb_set_flag(r, RSB_RECOVER_GRANT);
5436
5437                         (*count)++;
5438                 }
5439         }
5440 }
5441
5442 /* Get rid of locks held by nodes that are gone. */
5443
5444 void dlm_recover_purge(struct dlm_ls *ls)
5445 {
5446         struct dlm_rsb *r;
5447         struct dlm_member *memb;
5448         int nodes_count = 0;
5449         int nodeid_gone = 0;
5450         unsigned int lkb_count = 0;
5451
5452         /* cache one removed nodeid to optimize the common
5453            case of a single node removed */
5454
5455         list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5456                 nodes_count++;
5457                 nodeid_gone = memb->nodeid;
5458         }
5459
5460         if (!nodes_count)
5461                 return;
5462
5463         down_write(&ls->ls_root_sem);
5464         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5465                 hold_rsb(r);
5466                 lock_rsb(r);
5467                 if (is_master(r)) {
5468                         purge_dead_list(ls, r, &r->res_grantqueue,
5469                                         nodeid_gone, &lkb_count);
5470                         purge_dead_list(ls, r, &r->res_convertqueue,
5471                                         nodeid_gone, &lkb_count);
5472                         purge_dead_list(ls, r, &r->res_waitqueue,
5473                                         nodeid_gone, &lkb_count);
5474                 }
5475                 unlock_rsb(r);
5476                 unhold_rsb(r);
5477                 cond_resched();
5478         }
5479         up_write(&ls->ls_root_sem);
5480
5481         if (lkb_count)
5482                 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5483                           lkb_count, nodes_count);
5484 }
5485
5486 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
5487 {
5488         struct rb_node *n;
5489         struct dlm_rsb *r;
5490
5491         spin_lock(&ls->ls_rsbtbl[bucket].lock);
5492         for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5493                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
5494
5495                 if (!rsb_flag(r, RSB_RECOVER_GRANT))
5496                         continue;
5497                 if (!is_master(r)) {
5498                         rsb_clear_flag(r, RSB_RECOVER_GRANT);
5499                         continue;
5500                 }
5501                 hold_rsb(r);
5502                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5503                 return r;
5504         }
5505         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5506         return NULL;
5507 }
5508
5509 /*
5510  * Attempt to grant locks on resources that we are the master of.
5511  * Locks may have become grantable during recovery because locks
5512  * from departed nodes have been purged (or not rebuilt), allowing
5513  * previously blocked locks to now be granted.  The subset of rsb's
5514  * we are interested in are those with lkb's on either the convert or
5515  * waiting queues.
5516  *
5517  * Simplest would be to go through each master rsb and check for non-empty
5518  * convert or waiting queues, and attempt to grant on those rsbs.
5519  * Checking the queues requires lock_rsb, though, for which we'd need
5520  * to release the rsbtbl lock.  This would make iterating through all
5521  * rsb's very inefficient.  So, we rely on earlier recovery routines
5522  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5523  * locks for.
5524  */
5525
5526 void dlm_recover_grant(struct dlm_ls *ls)
5527 {
5528         struct dlm_rsb *r;
5529         int bucket = 0;
5530         unsigned int count = 0;
5531         unsigned int rsb_count = 0;
5532         unsigned int lkb_count = 0;
5533
5534         while (1) {
5535                 r = find_grant_rsb(ls, bucket);
5536                 if (!r) {
5537                         if (bucket == ls->ls_rsbtbl_size - 1)
5538                                 break;
5539                         bucket++;
5540                         continue;
5541                 }
5542                 rsb_count++;
5543                 count = 0;
5544                 lock_rsb(r);
5545                 /* the RECOVER_GRANT flag is checked in the grant path */
5546                 grant_pending_locks(r, &count);
5547                 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5548                 lkb_count += count;
5549                 confirm_master(r, 0);
5550                 unlock_rsb(r);
5551                 put_rsb(r);
5552                 cond_resched();
5553         }
5554
5555         if (lkb_count)
5556                 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5557                           lkb_count, rsb_count);
5558 }
5559
5560 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5561                                          uint32_t remid)
5562 {
5563         struct dlm_lkb *lkb;
5564
5565         list_for_each_entry(lkb, head, lkb_statequeue) {
5566                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5567                         return lkb;
5568         }
5569         return NULL;
5570 }
5571
5572 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5573                                     uint32_t remid)
5574 {
5575         struct dlm_lkb *lkb;
5576
5577         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5578         if (lkb)
5579                 return lkb;
5580         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5581         if (lkb)
5582                 return lkb;
5583         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5584         if (lkb)
5585                 return lkb;
5586         return NULL;
5587 }
5588
5589 /* needs at least dlm_rcom + rcom_lock */
5590 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5591                                   struct dlm_rsb *r, struct dlm_rcom *rc)
5592 {
5593         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5594
5595         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
5596         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5597         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5598         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5599         lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
5600         lkb->lkb_flags |= DLM_IFL_MSTCPY;
5601         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5602         lkb->lkb_rqmode = rl->rl_rqmode;
5603         lkb->lkb_grmode = rl->rl_grmode;
5604         /* don't set lkb_status because add_lkb wants to itself */
5605
5606         lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5607         lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5608
5609         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5610                 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
5611                          sizeof(struct rcom_lock);
5612                 if (lvblen > ls->ls_lvblen)
5613                         return -EINVAL;
5614                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5615                 if (!lkb->lkb_lvbptr)
5616                         return -ENOMEM;
5617                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5618         }
5619
5620         /* Conversions between PR and CW (middle modes) need special handling.
5621            The real granted mode of these converting locks cannot be determined
5622            until all locks have been rebuilt on the rsb (recover_conversion) */
5623
5624         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5625             middle_conversion(lkb)) {
5626                 rl->rl_status = DLM_LKSTS_CONVERT;
5627                 lkb->lkb_grmode = DLM_LOCK_IV;
5628                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
5629         }
5630
5631         return 0;
5632 }
5633
5634 /* This lkb may have been recovered in a previous aborted recovery so we need
5635    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5636    If so we just send back a standard reply.  If not, we create a new lkb with
5637    the given values and send back our lkid.  We send back our lkid by sending
5638    back the rcom_lock struct we got but with the remid field filled in. */
5639
5640 /* needs at least dlm_rcom + rcom_lock */
5641 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5642 {
5643         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5644         struct dlm_rsb *r;
5645         struct dlm_lkb *lkb;
5646         uint32_t remid = 0;
5647         int from_nodeid = rc->rc_header.h_nodeid;
5648         int error;
5649
5650         if (rl->rl_parent_lkid) {
5651                 error = -EOPNOTSUPP;
5652                 goto out;
5653         }
5654
5655         remid = le32_to_cpu(rl->rl_lkid);
5656
5657         /* In general we expect the rsb returned to be R_MASTER, but we don't
5658            have to require it.  Recovery of masters on one node can overlap
5659            recovery of locks on another node, so one node can send us MSTCPY
5660            locks before we've made ourselves master of this rsb.  We can still
5661            add new MSTCPY locks that we receive here without any harm; when
5662            we make ourselves master, dlm_recover_masters() won't touch the
5663            MSTCPY locks we've received early. */
5664
5665         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5666                          from_nodeid, R_RECEIVE_RECOVER, &r);
5667         if (error)
5668                 goto out;
5669
5670         lock_rsb(r);
5671
5672         if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5673                 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5674                           from_nodeid, remid);
5675                 error = -EBADR;
5676                 goto out_unlock;
5677         }
5678
5679         lkb = search_remid(r, from_nodeid, remid);
5680         if (lkb) {
5681                 error = -EEXIST;
5682                 goto out_remid;
5683         }
5684
5685         error = create_lkb(ls, &lkb);
5686         if (error)
5687                 goto out_unlock;
5688
5689         error = receive_rcom_lock_args(ls, lkb, r, rc);
5690         if (error) {
5691                 __put_lkb(ls, lkb);
5692                 goto out_unlock;
5693         }
5694
5695         attach_lkb(r, lkb);
5696         add_lkb(r, lkb, rl->rl_status);
5697         error = 0;
5698         ls->ls_recover_locks_in++;
5699
5700         if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5701                 rsb_set_flag(r, RSB_RECOVER_GRANT);
5702
5703  out_remid:
5704         /* this is the new value returned to the lock holder for
5705            saving in its process-copy lkb */
5706         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
5707
5708         lkb->lkb_recover_seq = ls->ls_recover_seq;
5709
5710  out_unlock:
5711         unlock_rsb(r);
5712         put_rsb(r);
5713  out:
5714         if (error && error != -EEXIST)
5715                 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5716                           from_nodeid, remid, error);
5717         rl->rl_result = cpu_to_le32(error);
5718         return error;
5719 }
5720
5721 /* needs at least dlm_rcom + rcom_lock */
5722 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5723 {
5724         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5725         struct dlm_rsb *r;
5726         struct dlm_lkb *lkb;
5727         uint32_t lkid, remid;
5728         int error, result;
5729
5730         lkid = le32_to_cpu(rl->rl_lkid);
5731         remid = le32_to_cpu(rl->rl_remid);
5732         result = le32_to_cpu(rl->rl_result);
5733
5734         error = find_lkb(ls, lkid, &lkb);
5735         if (error) {
5736                 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5737                           lkid, rc->rc_header.h_nodeid, remid, result);
5738                 return error;
5739         }
5740
5741         r = lkb->lkb_resource;
5742         hold_rsb(r);
5743         lock_rsb(r);
5744
5745         if (!is_process_copy(lkb)) {
5746                 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5747                           lkid, rc->rc_header.h_nodeid, remid, result);
5748                 dlm_dump_rsb(r);
5749                 unlock_rsb(r);
5750                 put_rsb(r);
5751                 dlm_put_lkb(lkb);
5752                 return -EINVAL;
5753         }
5754
5755         switch (result) {
5756         case -EBADR:
5757                 /* There's a chance the new master received our lock before
5758                    dlm_recover_master_reply(), this wouldn't happen if we did
5759                    a barrier between recover_masters and recover_locks. */
5760
5761                 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5762                           lkid, rc->rc_header.h_nodeid, remid, result);
5763         
5764                 dlm_send_rcom_lock(r, lkb);
5765                 goto out;
5766         case -EEXIST:
5767         case 0:
5768                 lkb->lkb_remid = remid;
5769                 break;
5770         default:
5771                 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5772                           lkid, rc->rc_header.h_nodeid, remid, result);
5773         }
5774
5775         /* an ack for dlm_recover_locks() which waits for replies from
5776            all the locks it sends to new masters */
5777         dlm_recovered_lock(r);
5778  out:
5779         unlock_rsb(r);
5780         put_rsb(r);
5781         dlm_put_lkb(lkb);
5782
5783         return 0;
5784 }
5785
5786 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5787                      int mode, uint32_t flags, void *name, unsigned int namelen,
5788                      unsigned long timeout_cs)
5789 {
5790         struct dlm_lkb *lkb;
5791         struct dlm_args args;
5792         int error;
5793
5794         dlm_lock_recovery(ls);
5795
5796         error = create_lkb(ls, &lkb);
5797         if (error) {
5798                 kfree(ua);
5799                 goto out;
5800         }
5801
5802         if (flags & DLM_LKF_VALBLK) {
5803                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5804                 if (!ua->lksb.sb_lvbptr) {
5805                         kfree(ua);
5806                         __put_lkb(ls, lkb);
5807                         error = -ENOMEM;
5808                         goto out;
5809                 }
5810         }
5811         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
5812                               fake_astfn, ua, fake_bastfn, &args);
5813         if (error) {
5814                 kfree(ua->lksb.sb_lvbptr);
5815                 ua->lksb.sb_lvbptr = NULL;
5816                 kfree(ua);
5817                 __put_lkb(ls, lkb);
5818                 goto out;
5819         }
5820
5821         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5822            When DLM_IFL_USER is set, the dlm knows that this is a userspace
5823            lock and that lkb_astparam is the dlm_user_args structure. */
5824         lkb->lkb_flags |= DLM_IFL_USER;
5825         error = request_lock(ls, lkb, name, namelen, &args);
5826
5827         switch (error) {
5828         case 0:
5829                 break;
5830         case -EINPROGRESS:
5831                 error = 0;
5832                 break;
5833         case -EAGAIN:
5834                 error = 0;
5835                 /* fall through */
5836         default:
5837                 __put_lkb(ls, lkb);
5838                 goto out;
5839         }
5840
5841         /* add this new lkb to the per-process list of locks */
5842         spin_lock(&ua->proc->locks_spin);
5843         hold_lkb(lkb);
5844         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5845         spin_unlock(&ua->proc->locks_spin);
5846  out:
5847         dlm_unlock_recovery(ls);
5848         return error;
5849 }
5850
5851 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5852                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
5853                      unsigned long timeout_cs)
5854 {
5855         struct dlm_lkb *lkb;
5856         struct dlm_args args;
5857         struct dlm_user_args *ua;
5858         int error;
5859
5860         dlm_lock_recovery(ls);
5861
5862         error = find_lkb(ls, lkid, &lkb);
5863         if (error)
5864                 goto out;
5865
5866         /* user can change the params on its lock when it converts it, or
5867            add an lvb that didn't exist before */
5868
5869         ua = lkb->lkb_ua;
5870
5871         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5872                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5873                 if (!ua->lksb.sb_lvbptr) {
5874                         error = -ENOMEM;
5875                         goto out_put;
5876                 }
5877         }
5878         if (lvb_in && ua->lksb.sb_lvbptr)
5879                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5880
5881         ua->xid = ua_tmp->xid;
5882         ua->castparam = ua_tmp->castparam;
5883         ua->castaddr = ua_tmp->castaddr;
5884         ua->bastparam = ua_tmp->bastparam;
5885         ua->bastaddr = ua_tmp->bastaddr;
5886         ua->user_lksb = ua_tmp->user_lksb;
5887
5888         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
5889                               fake_astfn, ua, fake_bastfn, &args);
5890         if (error)
5891                 goto out_put;
5892
5893         error = convert_lock(ls, lkb, &args);
5894
5895         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5896                 error = 0;
5897  out_put:
5898         dlm_put_lkb(lkb);
5899  out:
5900         dlm_unlock_recovery(ls);
5901         kfree(ua_tmp);
5902         return error;
5903 }
5904
5905 /*
5906  * The caller asks for an orphan lock on a given resource with a given mode.
5907  * If a matching lock exists, it's moved to the owner's list of locks and
5908  * the lkid is returned.
5909  */
5910
5911 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5912                      int mode, uint32_t flags, void *name, unsigned int namelen,
5913                      unsigned long timeout_cs, uint32_t *lkid)
5914 {
5915         struct dlm_lkb *lkb;
5916         struct dlm_user_args *ua;
5917         int found_other_mode = 0;
5918         int found = 0;
5919         int rv = 0;
5920
5921         mutex_lock(&ls->ls_orphans_mutex);
5922         list_for_each_entry(lkb, &ls->ls_orphans, lkb_ownqueue) {
5923                 if (lkb->lkb_resource->res_length != namelen)
5924                         continue;
5925                 if (memcmp(lkb->lkb_resource->res_name, name, namelen))
5926                         continue;
5927                 if (lkb->lkb_grmode != mode) {
5928                         found_other_mode = 1;
5929                         continue;
5930                 }
5931
5932                 found = 1;
5933                 list_del_init(&lkb->lkb_ownqueue);
5934                 lkb->lkb_flags &= ~DLM_IFL_ORPHAN;
5935                 *lkid = lkb->lkb_id;
5936                 break;
5937         }
5938         mutex_unlock(&ls->ls_orphans_mutex);
5939
5940         if (!found && found_other_mode) {
5941                 rv = -EAGAIN;
5942                 goto out;
5943         }
5944
5945         if (!found) {
5946                 rv = -ENOENT;
5947                 goto out;
5948         }
5949
5950         lkb->lkb_exflags = flags;
5951         lkb->lkb_ownpid = (int) current->pid;
5952
5953         ua = lkb->lkb_ua;
5954
5955         ua->proc = ua_tmp->proc;
5956         ua->xid = ua_tmp->xid;
5957         ua->castparam = ua_tmp->castparam;
5958         ua->castaddr = ua_tmp->castaddr;
5959         ua->bastparam = ua_tmp->bastparam;
5960         ua->bastaddr = ua_tmp->bastaddr;
5961         ua->user_lksb = ua_tmp->user_lksb;
5962
5963         /*
5964          * The lkb reference from the ls_orphans list was not
5965          * removed above, and is now considered the reference
5966          * for the proc locks list.
5967          */
5968
5969         spin_lock(&ua->proc->locks_spin);
5970         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5971         spin_unlock(&ua->proc->locks_spin);
5972  out:
5973         kfree(ua_tmp);
5974         return rv;
5975 }
5976
5977 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5978                     uint32_t flags, uint32_t lkid, char *lvb_in)
5979 {
5980         struct dlm_lkb *lkb;
5981         struct dlm_args args;
5982         struct dlm_user_args *ua;
5983         int error;
5984
5985         dlm_lock_recovery(ls);
5986
5987         error = find_lkb(ls, lkid, &lkb);
5988         if (error)
5989                 goto out;
5990
5991         ua = lkb->lkb_ua;
5992
5993         if (lvb_in && ua->lksb.sb_lvbptr)
5994                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5995         if (ua_tmp->castparam)
5996                 ua->castparam = ua_tmp->castparam;
5997         ua->user_lksb = ua_tmp->user_lksb;
5998
5999         error = set_unlock_args(flags, ua, &args);
6000         if (error)
6001                 goto out_put;
6002
6003         error = unlock_lock(ls, lkb, &args);
6004
6005         if (error == -DLM_EUNLOCK)
6006                 error = 0;
6007         /* from validate_unlock_args() */
6008         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
6009                 error = 0;
6010         if (error)
6011                 goto out_put;
6012
6013         spin_lock(&ua->proc->locks_spin);
6014         /* dlm_user_add_cb() may have already taken lkb off the proc list */
6015         if (!list_empty(&lkb->lkb_ownqueue))
6016                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
6017         spin_unlock(&ua->proc->locks_spin);
6018  out_put:
6019         dlm_put_lkb(lkb);
6020  out:
6021         dlm_unlock_recovery(ls);
6022         kfree(ua_tmp);
6023         return error;
6024 }
6025
6026 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
6027                     uint32_t flags, uint32_t lkid)
6028 {
6029         struct dlm_lkb *lkb;
6030         struct dlm_args args;
6031         struct dlm_user_args *ua;
6032         int error;
6033
6034         dlm_lock_recovery(ls);
6035
6036         error = find_lkb(ls, lkid, &lkb);
6037         if (error)
6038                 goto out;
6039
6040         ua = lkb->lkb_ua;
6041         if (ua_tmp->castparam)
6042                 ua->castparam = ua_tmp->castparam;
6043         ua->user_lksb = ua_tmp->user_lksb;
6044
6045         error = set_unlock_args(flags, ua, &args);
6046         if (error)
6047                 goto out_put;
6048
6049         error = cancel_lock(ls, lkb, &args);
6050
6051         if (error == -DLM_ECANCEL)
6052                 error = 0;
6053         /* from validate_unlock_args() */
6054         if (error == -EBUSY)
6055                 error = 0;
6056  out_put:
6057         dlm_put_lkb(lkb);
6058  out:
6059         dlm_unlock_recovery(ls);
6060         kfree(ua_tmp);
6061         return error;
6062 }
6063
6064 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6065 {
6066         struct dlm_lkb *lkb;
6067         struct dlm_args args;
6068         struct dlm_user_args *ua;
6069         struct dlm_rsb *r;
6070         int error;
6071
6072         dlm_lock_recovery(ls);
6073
6074         error = find_lkb(ls, lkid, &lkb);
6075         if (error)
6076                 goto out;
6077
6078         ua = lkb->lkb_ua;
6079
6080         error = set_unlock_args(flags, ua, &args);
6081         if (error)
6082                 goto out_put;
6083
6084         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6085
6086         r = lkb->lkb_resource;
6087         hold_rsb(r);
6088         lock_rsb(r);
6089
6090         error = validate_unlock_args(lkb, &args);
6091         if (error)
6092                 goto out_r;
6093         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
6094
6095         error = _cancel_lock(r, lkb);
6096  out_r:
6097         unlock_rsb(r);
6098         put_rsb(r);
6099
6100         if (error == -DLM_ECANCEL)
6101                 error = 0;
6102         /* from validate_unlock_args() */
6103         if (error == -EBUSY)
6104                 error = 0;
6105  out_put:
6106         dlm_put_lkb(lkb);
6107  out:
6108         dlm_unlock_recovery(ls);
6109         return error;
6110 }
6111
6112 /* lkb's that are removed from the waiters list by revert are just left on the
6113    orphans list with the granted orphan locks, to be freed by purge */
6114
6115 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6116 {
6117         struct dlm_args args;
6118         int error;
6119
6120         hold_lkb(lkb); /* reference for the ls_orphans list */
6121         mutex_lock(&ls->ls_orphans_mutex);
6122         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6123         mutex_unlock(&ls->ls_orphans_mutex);
6124
6125         set_unlock_args(0, lkb->lkb_ua, &args);
6126
6127         error = cancel_lock(ls, lkb, &args);
6128         if (error == -DLM_ECANCEL)
6129                 error = 0;
6130         return error;
6131 }
6132
6133 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6134    granted.  Regardless of what rsb queue the lock is on, it's removed and
6135    freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
6136    if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6137
6138 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6139 {
6140         struct dlm_args args;
6141         int error;
6142
6143         set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6144                         lkb->lkb_ua, &args);
6145
6146         error = unlock_lock(ls, lkb, &args);
6147         if (error == -DLM_EUNLOCK)
6148                 error = 0;
6149         return error;
6150 }
6151
6152 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6153    (which does lock_rsb) due to deadlock with receiving a message that does
6154    lock_rsb followed by dlm_user_add_cb() */
6155
6156 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6157                                      struct dlm_user_proc *proc)
6158 {
6159         struct dlm_lkb *lkb = NULL;
6160
6161         mutex_lock(&ls->ls_clear_proc_locks);
6162         if (list_empty(&proc->locks))
6163                 goto out;
6164
6165         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6166         list_del_init(&lkb->lkb_ownqueue);
6167
6168         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6169                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
6170         else
6171                 lkb->lkb_flags |= DLM_IFL_DEAD;
6172  out:
6173         mutex_unlock(&ls->ls_clear_proc_locks);
6174         return lkb;
6175 }
6176
6177 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6178    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6179    which we clear here. */
6180
6181 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6182    list, and no more device_writes should add lkb's to proc->locks list; so we
6183    shouldn't need to take asts_spin or locks_spin here.  this assumes that
6184    device reads/writes/closes are serialized -- FIXME: we may need to serialize
6185    them ourself. */
6186
6187 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6188 {
6189         struct dlm_lkb *lkb, *safe;
6190
6191         dlm_lock_recovery(ls);
6192
6193         while (1) {
6194                 lkb = del_proc_lock(ls, proc);
6195                 if (!lkb)
6196                         break;
6197                 del_timeout(lkb);
6198                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6199                         orphan_proc_lock(ls, lkb);
6200                 else
6201                         unlock_proc_lock(ls, lkb);
6202
6203                 /* this removes the reference for the proc->locks list
6204                    added by dlm_user_request, it may result in the lkb
6205                    being freed */
6206
6207                 dlm_put_lkb(lkb);
6208         }
6209
6210         mutex_lock(&ls->ls_clear_proc_locks);
6211
6212         /* in-progress unlocks */
6213         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6214                 list_del_init(&lkb->lkb_ownqueue);
6215                 lkb->lkb_flags |= DLM_IFL_DEAD;
6216                 dlm_put_lkb(lkb);
6217         }
6218
6219         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6220                 memset(&lkb->lkb_callbacks, 0,
6221                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6222                 list_del_init(&lkb->lkb_cb_list);
6223                 dlm_put_lkb(lkb);
6224         }
6225
6226         mutex_unlock(&ls->ls_clear_proc_locks);
6227         dlm_unlock_recovery(ls);
6228 }
6229
6230 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6231 {
6232         struct dlm_lkb *lkb, *safe;
6233
6234         while (1) {
6235                 lkb = NULL;
6236                 spin_lock(&proc->locks_spin);
6237                 if (!list_empty(&proc->locks)) {
6238                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
6239                                          lkb_ownqueue);
6240                         list_del_init(&lkb->lkb_ownqueue);
6241                 }
6242                 spin_unlock(&proc->locks_spin);
6243
6244                 if (!lkb)
6245                         break;
6246
6247                 lkb->lkb_flags |= DLM_IFL_DEAD;
6248                 unlock_proc_lock(ls, lkb);
6249                 dlm_put_lkb(lkb); /* ref from proc->locks list */
6250         }
6251
6252         spin_lock(&proc->locks_spin);
6253         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6254                 list_del_init(&lkb->lkb_ownqueue);
6255                 lkb->lkb_flags |= DLM_IFL_DEAD;
6256                 dlm_put_lkb(lkb);
6257         }
6258         spin_unlock(&proc->locks_spin);
6259
6260         spin_lock(&proc->asts_spin);
6261         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6262                 memset(&lkb->lkb_callbacks, 0,
6263                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6264                 list_del_init(&lkb->lkb_cb_list);
6265                 dlm_put_lkb(lkb);
6266         }
6267         spin_unlock(&proc->asts_spin);
6268 }
6269
6270 /* pid of 0 means purge all orphans */
6271
6272 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6273 {
6274         struct dlm_lkb *lkb, *safe;
6275
6276         mutex_lock(&ls->ls_orphans_mutex);
6277         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6278                 if (pid && lkb->lkb_ownpid != pid)
6279                         continue;
6280                 unlock_proc_lock(ls, lkb);
6281                 list_del_init(&lkb->lkb_ownqueue);
6282                 dlm_put_lkb(lkb);
6283         }
6284         mutex_unlock(&ls->ls_orphans_mutex);
6285 }
6286
6287 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6288 {
6289         struct dlm_message *ms;
6290         struct dlm_mhandle *mh;
6291         int error;
6292
6293         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6294                                 DLM_MSG_PURGE, &ms, &mh);
6295         if (error)
6296                 return error;
6297         ms->m_nodeid = nodeid;
6298         ms->m_pid = pid;
6299
6300         return send_message(mh, ms);
6301 }
6302
6303 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6304                    int nodeid, int pid)
6305 {
6306         int error = 0;
6307
6308         if (nodeid && (nodeid != dlm_our_nodeid())) {
6309                 error = send_purge(ls, nodeid, pid);
6310         } else {
6311                 dlm_lock_recovery(ls);
6312                 if (pid == current->pid)
6313                         purge_proc_locks(ls, proc);
6314                 else
6315                         do_purge(ls, nodeid, pid);
6316                 dlm_unlock_recovery(ls);
6317         }
6318         return error;
6319 }
6320