GNU Linux-libre 4.19.211-gnu1
[releases.git] / fs / ocfs2 / dlmglue.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmglue.c
5  *
6  * Code which implements an OCFS2 specific interface to our DLM.
7  *
8  * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  */
25
26 #include <linux/types.h>
27 #include <linux/slab.h>
28 #include <linux/highmem.h>
29 #include <linux/mm.h>
30 #include <linux/kthread.h>
31 #include <linux/pagemap.h>
32 #include <linux/debugfs.h>
33 #include <linux/seq_file.h>
34 #include <linux/time.h>
35 #include <linux/quotaops.h>
36 #include <linux/sched/signal.h>
37
38 #define MLOG_MASK_PREFIX ML_DLM_GLUE
39 #include <cluster/masklog.h>
40
41 #include "ocfs2.h"
42 #include "ocfs2_lockingver.h"
43
44 #include "alloc.h"
45 #include "dcache.h"
46 #include "dlmglue.h"
47 #include "extent_map.h"
48 #include "file.h"
49 #include "heartbeat.h"
50 #include "inode.h"
51 #include "journal.h"
52 #include "stackglue.h"
53 #include "slot_map.h"
54 #include "super.h"
55 #include "uptodate.h"
56 #include "quota.h"
57 #include "refcounttree.h"
58 #include "acl.h"
59
60 #include "buffer_head_io.h"
61
62 struct ocfs2_mask_waiter {
63         struct list_head        mw_item;
64         int                     mw_status;
65         struct completion       mw_complete;
66         unsigned long           mw_mask;
67         unsigned long           mw_goal;
68 #ifdef CONFIG_OCFS2_FS_STATS
69         ktime_t                 mw_lock_start;
70 #endif
71 };
72
73 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
74 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
75 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres);
76 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres);
77
78 /*
79  * Return value from ->downconvert_worker functions.
80  *
81  * These control the precise actions of ocfs2_unblock_lock()
82  * and ocfs2_process_blocked_lock()
83  *
84  */
85 enum ocfs2_unblock_action {
86         UNBLOCK_CONTINUE        = 0, /* Continue downconvert */
87         UNBLOCK_CONTINUE_POST   = 1, /* Continue downconvert, fire
88                                       * ->post_unlock callback */
89         UNBLOCK_STOP_POST       = 2, /* Do not downconvert, fire
90                                       * ->post_unlock() callback. */
91 };
92
93 struct ocfs2_unblock_ctl {
94         int requeue;
95         enum ocfs2_unblock_action unblock_action;
96 };
97
98 /* Lockdep class keys */
99 #ifdef CONFIG_DEBUG_LOCK_ALLOC
100 static struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES];
101 #endif
102
103 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
104                                         int new_level);
105 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
106
107 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
108                                      int blocking);
109
110 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
111                                        int blocking);
112
113 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
114                                      struct ocfs2_lock_res *lockres);
115
116 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres);
117
118 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
119                                             int new_level);
120 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
121                                          int blocking);
122
123 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres)
124
125 /* This aids in debugging situations where a bad LVB might be involved. */
126 static void ocfs2_dump_meta_lvb_info(u64 level,
127                                      const char *function,
128                                      unsigned int line,
129                                      struct ocfs2_lock_res *lockres)
130 {
131         struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
132
133         mlog(level, "LVB information for %s (called from %s:%u):\n",
134              lockres->l_name, function, line);
135         mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
136              lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
137              be32_to_cpu(lvb->lvb_igeneration));
138         mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
139              (unsigned long long)be64_to_cpu(lvb->lvb_isize),
140              be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
141              be16_to_cpu(lvb->lvb_imode));
142         mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
143              "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
144              (long long)be64_to_cpu(lvb->lvb_iatime_packed),
145              (long long)be64_to_cpu(lvb->lvb_ictime_packed),
146              (long long)be64_to_cpu(lvb->lvb_imtime_packed),
147              be32_to_cpu(lvb->lvb_iattr));
148 }
149
150
151 /*
152  * OCFS2 Lock Resource Operations
153  *
154  * These fine tune the behavior of the generic dlmglue locking infrastructure.
155  *
156  * The most basic of lock types can point ->l_priv to their respective
157  * struct ocfs2_super and allow the default actions to manage things.
158  *
159  * Right now, each lock type also needs to implement an init function,
160  * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
161  * should be called when the lock is no longer needed (i.e., object
162  * destruction time).
163  */
164 struct ocfs2_lock_res_ops {
165         /*
166          * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
167          * this callback if ->l_priv is not an ocfs2_super pointer
168          */
169         struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
170
171         /*
172          * Optionally called in the downconvert thread after a
173          * successful downconvert. The lockres will not be referenced
174          * after this callback is called, so it is safe to free
175          * memory, etc.
176          *
177          * The exact semantics of when this is called are controlled
178          * by ->downconvert_worker()
179          */
180         void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
181
182         /*
183          * Allow a lock type to add checks to determine whether it is
184          * safe to downconvert a lock. Return 0 to re-queue the
185          * downconvert at a later time, nonzero to continue.
186          *
187          * For most locks, the default checks that there are no
188          * incompatible holders are sufficient.
189          *
190          * Called with the lockres spinlock held.
191          */
192         int (*check_downconvert)(struct ocfs2_lock_res *, int);
193
194         /*
195          * Allows a lock type to populate the lock value block. This
196          * is called on downconvert, and when we drop a lock.
197          *
198          * Locks that want to use this should set LOCK_TYPE_USES_LVB
199          * in the flags field.
200          *
201          * Called with the lockres spinlock held.
202          */
203         void (*set_lvb)(struct ocfs2_lock_res *);
204
205         /*
206          * Called from the downconvert thread when it is determined
207          * that a lock will be downconverted. This is called without
208          * any locks held so the function can do work that might
209          * schedule (syncing out data, etc).
210          *
211          * This should return any one of the ocfs2_unblock_action
212          * values, depending on what it wants the thread to do.
213          */
214         int (*downconvert_worker)(struct ocfs2_lock_res *, int);
215
216         /*
217          * LOCK_TYPE_* flags which describe the specific requirements
218          * of a lock type. Descriptions of each individual flag follow.
219          */
220         int flags;
221 };
222
223 /*
224  * Some locks want to "refresh" potentially stale data when a
225  * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
226  * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
227  * individual lockres l_flags member from the ast function. It is
228  * expected that the locking wrapper will clear the
229  * OCFS2_LOCK_NEEDS_REFRESH flag when done.
230  */
231 #define LOCK_TYPE_REQUIRES_REFRESH 0x1
232
233 /*
234  * Indicate that a lock type makes use of the lock value block. The
235  * ->set_lvb lock type callback must be defined.
236  */
237 #define LOCK_TYPE_USES_LVB              0x2
238
239 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
240         .get_osb        = ocfs2_get_inode_osb,
241         .flags          = 0,
242 };
243
244 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = {
245         .get_osb        = ocfs2_get_inode_osb,
246         .check_downconvert = ocfs2_check_meta_downconvert,
247         .set_lvb        = ocfs2_set_meta_lvb,
248         .downconvert_worker = ocfs2_data_convert_worker,
249         .flags          = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
250 };
251
252 static struct ocfs2_lock_res_ops ocfs2_super_lops = {
253         .flags          = LOCK_TYPE_REQUIRES_REFRESH,
254 };
255
256 static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
257         .flags          = 0,
258 };
259
260 static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = {
261         .flags          = 0,
262 };
263
264 static struct ocfs2_lock_res_ops ocfs2_trim_fs_lops = {
265         .flags          = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
266 };
267
268 static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = {
269         .flags          = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
270 };
271
272 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
273         .get_osb        = ocfs2_get_dentry_osb,
274         .post_unlock    = ocfs2_dentry_post_unlock,
275         .downconvert_worker = ocfs2_dentry_convert_worker,
276         .flags          = 0,
277 };
278
279 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = {
280         .get_osb        = ocfs2_get_inode_osb,
281         .flags          = 0,
282 };
283
284 static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
285         .get_osb        = ocfs2_get_file_osb,
286         .flags          = 0,
287 };
288
289 static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = {
290         .set_lvb        = ocfs2_set_qinfo_lvb,
291         .get_osb        = ocfs2_get_qinfo_osb,
292         .flags          = LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB,
293 };
294
295 static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = {
296         .check_downconvert = ocfs2_check_refcount_downconvert,
297         .downconvert_worker = ocfs2_refcount_convert_worker,
298         .flags          = 0,
299 };
300
301 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
302 {
303         return lockres->l_type == OCFS2_LOCK_TYPE_META ||
304                 lockres->l_type == OCFS2_LOCK_TYPE_RW ||
305                 lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
306 }
307
308 static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
309 {
310         return container_of(lksb, struct ocfs2_lock_res, l_lksb);
311 }
312
313 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
314 {
315         BUG_ON(!ocfs2_is_inode_lock(lockres));
316
317         return (struct inode *) lockres->l_priv;
318 }
319
320 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
321 {
322         BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
323
324         return (struct ocfs2_dentry_lock *)lockres->l_priv;
325 }
326
327 static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres)
328 {
329         BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO);
330
331         return (struct ocfs2_mem_dqinfo *)lockres->l_priv;
332 }
333
334 static inline struct ocfs2_refcount_tree *
335 ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res)
336 {
337         return container_of(res, struct ocfs2_refcount_tree, rf_lockres);
338 }
339
340 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
341 {
342         if (lockres->l_ops->get_osb)
343                 return lockres->l_ops->get_osb(lockres);
344
345         return (struct ocfs2_super *)lockres->l_priv;
346 }
347
348 static int ocfs2_lock_create(struct ocfs2_super *osb,
349                              struct ocfs2_lock_res *lockres,
350                              int level,
351                              u32 dlm_flags);
352 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
353                                                      int wanted);
354 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
355                                    struct ocfs2_lock_res *lockres,
356                                    int level, unsigned long caller_ip);
357 static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb,
358                                         struct ocfs2_lock_res *lockres,
359                                         int level)
360 {
361         __ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_);
362 }
363
364 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
365 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
366 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
367 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
368 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
369                                         struct ocfs2_lock_res *lockres);
370 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
371                                                 int convert);
372 #define ocfs2_log_dlm_error(_func, _err, _lockres) do {                                 \
373         if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY)                               \
374                 mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n",        \
375                      _err, _func, _lockres->l_name);                                    \
376         else                                                                            \
377                 mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n",  \
378                      _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name,  \
379                      (unsigned int)ocfs2_get_dentry_lock_ino(_lockres));                \
380 } while (0)
381 static int ocfs2_downconvert_thread(void *arg);
382 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
383                                         struct ocfs2_lock_res *lockres);
384 static int ocfs2_inode_lock_update(struct inode *inode,
385                                   struct buffer_head **bh);
386 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
387 static inline int ocfs2_highest_compat_lock_level(int level);
388 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
389                                               int new_level);
390 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
391                                   struct ocfs2_lock_res *lockres,
392                                   int new_level,
393                                   int lvb,
394                                   unsigned int generation);
395 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
396                                         struct ocfs2_lock_res *lockres);
397 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
398                                 struct ocfs2_lock_res *lockres);
399
400
401 static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
402                                   u64 blkno,
403                                   u32 generation,
404                                   char *name)
405 {
406         int len;
407
408         BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
409
410         len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
411                        ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
412                        (long long)blkno, generation);
413
414         BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
415
416         mlog(0, "built lock resource with name: %s\n", name);
417 }
418
419 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
420
421 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
422                                        struct ocfs2_dlm_debug *dlm_debug)
423 {
424         mlog(0, "Add tracking for lockres %s\n", res->l_name);
425
426         spin_lock(&ocfs2_dlm_tracking_lock);
427         list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
428         spin_unlock(&ocfs2_dlm_tracking_lock);
429 }
430
431 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
432 {
433         spin_lock(&ocfs2_dlm_tracking_lock);
434         if (!list_empty(&res->l_debug_list))
435                 list_del_init(&res->l_debug_list);
436         spin_unlock(&ocfs2_dlm_tracking_lock);
437 }
438
439 #ifdef CONFIG_OCFS2_FS_STATS
440 static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
441 {
442         res->l_lock_refresh = 0;
443         memset(&res->l_lock_prmode, 0, sizeof(struct ocfs2_lock_stats));
444         memset(&res->l_lock_exmode, 0, sizeof(struct ocfs2_lock_stats));
445 }
446
447 static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level,
448                                     struct ocfs2_mask_waiter *mw, int ret)
449 {
450         u32 usec;
451         ktime_t kt;
452         struct ocfs2_lock_stats *stats;
453
454         if (level == LKM_PRMODE)
455                 stats = &res->l_lock_prmode;
456         else if (level == LKM_EXMODE)
457                 stats = &res->l_lock_exmode;
458         else
459                 return;
460
461         kt = ktime_sub(ktime_get(), mw->mw_lock_start);
462         usec = ktime_to_us(kt);
463
464         stats->ls_gets++;
465         stats->ls_total += ktime_to_ns(kt);
466         /* overflow */
467         if (unlikely(stats->ls_gets == 0)) {
468                 stats->ls_gets++;
469                 stats->ls_total = ktime_to_ns(kt);
470         }
471
472         if (stats->ls_max < usec)
473                 stats->ls_max = usec;
474
475         if (ret)
476                 stats->ls_fail++;
477 }
478
479 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
480 {
481         lockres->l_lock_refresh++;
482 }
483
484 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
485 {
486         mw->mw_lock_start = ktime_get();
487 }
488 #else
489 static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
490 {
491 }
492 static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res,
493                            int level, struct ocfs2_mask_waiter *mw, int ret)
494 {
495 }
496 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
497 {
498 }
499 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
500 {
501 }
502 #endif
503
504 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
505                                        struct ocfs2_lock_res *res,
506                                        enum ocfs2_lock_type type,
507                                        struct ocfs2_lock_res_ops *ops,
508                                        void *priv)
509 {
510         res->l_type          = type;
511         res->l_ops           = ops;
512         res->l_priv          = priv;
513
514         res->l_level         = DLM_LOCK_IV;
515         res->l_requested     = DLM_LOCK_IV;
516         res->l_blocking      = DLM_LOCK_IV;
517         res->l_action        = OCFS2_AST_INVALID;
518         res->l_unlock_action = OCFS2_UNLOCK_INVALID;
519
520         res->l_flags         = OCFS2_LOCK_INITIALIZED;
521
522         ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
523
524         ocfs2_init_lock_stats(res);
525 #ifdef CONFIG_DEBUG_LOCK_ALLOC
526         if (type != OCFS2_LOCK_TYPE_OPEN)
527                 lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type],
528                                  &lockdep_keys[type], 0);
529         else
530                 res->l_lockdep_map.key = NULL;
531 #endif
532 }
533
534 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
535 {
536         /* This also clears out the lock status block */
537         memset(res, 0, sizeof(struct ocfs2_lock_res));
538         spin_lock_init(&res->l_lock);
539         init_waitqueue_head(&res->l_event);
540         INIT_LIST_HEAD(&res->l_blocked_list);
541         INIT_LIST_HEAD(&res->l_mask_waiters);
542         INIT_LIST_HEAD(&res->l_holders);
543 }
544
545 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
546                                enum ocfs2_lock_type type,
547                                unsigned int generation,
548                                struct inode *inode)
549 {
550         struct ocfs2_lock_res_ops *ops;
551
552         switch(type) {
553                 case OCFS2_LOCK_TYPE_RW:
554                         ops = &ocfs2_inode_rw_lops;
555                         break;
556                 case OCFS2_LOCK_TYPE_META:
557                         ops = &ocfs2_inode_inode_lops;
558                         break;
559                 case OCFS2_LOCK_TYPE_OPEN:
560                         ops = &ocfs2_inode_open_lops;
561                         break;
562                 default:
563                         mlog_bug_on_msg(1, "type: %d\n", type);
564                         ops = NULL; /* thanks, gcc */
565                         break;
566         };
567
568         ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
569                               generation, res->l_name);
570         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
571 }
572
573 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
574 {
575         struct inode *inode = ocfs2_lock_res_inode(lockres);
576
577         return OCFS2_SB(inode->i_sb);
578 }
579
580 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres)
581 {
582         struct ocfs2_mem_dqinfo *info = lockres->l_priv;
583
584         return OCFS2_SB(info->dqi_gi.dqi_sb);
585 }
586
587 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres)
588 {
589         struct ocfs2_file_private *fp = lockres->l_priv;
590
591         return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb);
592 }
593
594 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
595 {
596         __be64 inode_blkno_be;
597
598         memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
599                sizeof(__be64));
600
601         return be64_to_cpu(inode_blkno_be);
602 }
603
604 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
605 {
606         struct ocfs2_dentry_lock *dl = lockres->l_priv;
607
608         return OCFS2_SB(dl->dl_inode->i_sb);
609 }
610
611 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
612                                 u64 parent, struct inode *inode)
613 {
614         int len;
615         u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
616         __be64 inode_blkno_be = cpu_to_be64(inode_blkno);
617         struct ocfs2_lock_res *lockres = &dl->dl_lockres;
618
619         ocfs2_lock_res_init_once(lockres);
620
621         /*
622          * Unfortunately, the standard lock naming scheme won't work
623          * here because we have two 16 byte values to use. Instead,
624          * we'll stuff the inode number as a binary value. We still
625          * want error prints to show something without garbling the
626          * display, so drop a null byte in there before the inode
627          * number. A future version of OCFS2 will likely use all
628          * binary lock names. The stringified names have been a
629          * tremendous aid in debugging, but now that the debugfs
630          * interface exists, we can mangle things there if need be.
631          *
632          * NOTE: We also drop the standard "pad" value (the total lock
633          * name size stays the same though - the last part is all
634          * zeros due to the memset in ocfs2_lock_res_init_once()
635          */
636         len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
637                        "%c%016llx",
638                        ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
639                        (long long)parent);
640
641         BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
642
643         memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
644                sizeof(__be64));
645
646         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
647                                    OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
648                                    dl);
649 }
650
651 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
652                                       struct ocfs2_super *osb)
653 {
654         /* Superblock lockres doesn't come from a slab so we call init
655          * once on it manually.  */
656         ocfs2_lock_res_init_once(res);
657         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
658                               0, res->l_name);
659         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
660                                    &ocfs2_super_lops, osb);
661 }
662
663 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
664                                        struct ocfs2_super *osb)
665 {
666         /* Rename lockres doesn't come from a slab so we call init
667          * once on it manually.  */
668         ocfs2_lock_res_init_once(res);
669         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
670         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
671                                    &ocfs2_rename_lops, osb);
672 }
673
674 static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res,
675                                          struct ocfs2_super *osb)
676 {
677         /* nfs_sync lockres doesn't come from a slab so we call init
678          * once on it manually.  */
679         ocfs2_lock_res_init_once(res);
680         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name);
681         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC,
682                                    &ocfs2_nfs_sync_lops, osb);
683 }
684
685 static void ocfs2_nfs_sync_lock_init(struct ocfs2_super *osb)
686 {
687         ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb);
688         init_rwsem(&osb->nfs_sync_rwlock);
689 }
690
691 void ocfs2_trim_fs_lock_res_init(struct ocfs2_super *osb)
692 {
693         struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres;
694
695         ocfs2_lock_res_init_once(lockres);
696         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_TRIM_FS, 0, 0, lockres->l_name);
697         ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_TRIM_FS,
698                                    &ocfs2_trim_fs_lops, osb);
699 }
700
701 void ocfs2_trim_fs_lock_res_uninit(struct ocfs2_super *osb)
702 {
703         struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres;
704
705         ocfs2_simple_drop_lockres(osb, lockres);
706         ocfs2_lock_res_free(lockres);
707 }
708
709 static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res,
710                                             struct ocfs2_super *osb)
711 {
712         ocfs2_lock_res_init_once(res);
713         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name);
714         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN,
715                                    &ocfs2_orphan_scan_lops, osb);
716 }
717
718 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
719                               struct ocfs2_file_private *fp)
720 {
721         struct inode *inode = fp->fp_file->f_mapping->host;
722         struct ocfs2_inode_info *oi = OCFS2_I(inode);
723
724         ocfs2_lock_res_init_once(lockres);
725         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno,
726                               inode->i_generation, lockres->l_name);
727         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
728                                    OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops,
729                                    fp);
730         lockres->l_flags |= OCFS2_LOCK_NOCACHE;
731 }
732
733 void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres,
734                                struct ocfs2_mem_dqinfo *info)
735 {
736         ocfs2_lock_res_init_once(lockres);
737         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type,
738                               0, lockres->l_name);
739         ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres,
740                                    OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops,
741                                    info);
742 }
743
744 void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres,
745                                   struct ocfs2_super *osb, u64 ref_blkno,
746                                   unsigned int generation)
747 {
748         ocfs2_lock_res_init_once(lockres);
749         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno,
750                               generation, lockres->l_name);
751         ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT,
752                                    &ocfs2_refcount_block_lops, osb);
753 }
754
755 void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
756 {
757         if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
758                 return;
759
760         ocfs2_remove_lockres_tracking(res);
761
762         mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
763                         "Lockres %s is on the blocked list\n",
764                         res->l_name);
765         mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
766                         "Lockres %s has mask waiters pending\n",
767                         res->l_name);
768         mlog_bug_on_msg(spin_is_locked(&res->l_lock),
769                         "Lockres %s is locked\n",
770                         res->l_name);
771         mlog_bug_on_msg(res->l_ro_holders,
772                         "Lockres %s has %u ro holders\n",
773                         res->l_name, res->l_ro_holders);
774         mlog_bug_on_msg(res->l_ex_holders,
775                         "Lockres %s has %u ex holders\n",
776                         res->l_name, res->l_ex_holders);
777
778         /* Need to clear out the lock status block for the dlm */
779         memset(&res->l_lksb, 0, sizeof(res->l_lksb));
780
781         res->l_flags = 0UL;
782 }
783
784 /*
785  * Keep a list of processes who have interest in a lockres.
786  * Note: this is now only uesed for check recursive cluster locking.
787  */
788 static inline void ocfs2_add_holder(struct ocfs2_lock_res *lockres,
789                                    struct ocfs2_lock_holder *oh)
790 {
791         INIT_LIST_HEAD(&oh->oh_list);
792         oh->oh_owner_pid = get_pid(task_pid(current));
793
794         spin_lock(&lockres->l_lock);
795         list_add_tail(&oh->oh_list, &lockres->l_holders);
796         spin_unlock(&lockres->l_lock);
797 }
798
799 static struct ocfs2_lock_holder *
800 ocfs2_pid_holder(struct ocfs2_lock_res *lockres,
801                 struct pid *pid)
802 {
803         struct ocfs2_lock_holder *oh;
804
805         spin_lock(&lockres->l_lock);
806         list_for_each_entry(oh, &lockres->l_holders, oh_list) {
807                 if (oh->oh_owner_pid == pid) {
808                         spin_unlock(&lockres->l_lock);
809                         return oh;
810                 }
811         }
812         spin_unlock(&lockres->l_lock);
813         return NULL;
814 }
815
816 static inline void ocfs2_remove_holder(struct ocfs2_lock_res *lockres,
817                                        struct ocfs2_lock_holder *oh)
818 {
819         spin_lock(&lockres->l_lock);
820         list_del(&oh->oh_list);
821         spin_unlock(&lockres->l_lock);
822
823         put_pid(oh->oh_owner_pid);
824 }
825
826
827 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
828                                      int level)
829 {
830         BUG_ON(!lockres);
831
832         switch(level) {
833         case DLM_LOCK_EX:
834                 lockres->l_ex_holders++;
835                 break;
836         case DLM_LOCK_PR:
837                 lockres->l_ro_holders++;
838                 break;
839         default:
840                 BUG();
841         }
842 }
843
844 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
845                                      int level)
846 {
847         BUG_ON(!lockres);
848
849         switch(level) {
850         case DLM_LOCK_EX:
851                 BUG_ON(!lockres->l_ex_holders);
852                 lockres->l_ex_holders--;
853                 break;
854         case DLM_LOCK_PR:
855                 BUG_ON(!lockres->l_ro_holders);
856                 lockres->l_ro_holders--;
857                 break;
858         default:
859                 BUG();
860         }
861 }
862
863 /* WARNING: This function lives in a world where the only three lock
864  * levels are EX, PR, and NL. It *will* have to be adjusted when more
865  * lock types are added. */
866 static inline int ocfs2_highest_compat_lock_level(int level)
867 {
868         int new_level = DLM_LOCK_EX;
869
870         if (level == DLM_LOCK_EX)
871                 new_level = DLM_LOCK_NL;
872         else if (level == DLM_LOCK_PR)
873                 new_level = DLM_LOCK_PR;
874         return new_level;
875 }
876
877 static void lockres_set_flags(struct ocfs2_lock_res *lockres,
878                               unsigned long newflags)
879 {
880         struct ocfs2_mask_waiter *mw, *tmp;
881
882         assert_spin_locked(&lockres->l_lock);
883
884         lockres->l_flags = newflags;
885
886         list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) {
887                 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
888                         continue;
889
890                 list_del_init(&mw->mw_item);
891                 mw->mw_status = 0;
892                 complete(&mw->mw_complete);
893         }
894 }
895 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
896 {
897         lockres_set_flags(lockres, lockres->l_flags | or);
898 }
899 static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
900                                 unsigned long clear)
901 {
902         lockres_set_flags(lockres, lockres->l_flags & ~clear);
903 }
904
905 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
906 {
907         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
908         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
909         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
910         BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
911
912         lockres->l_level = lockres->l_requested;
913         if (lockres->l_level <=
914             ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
915                 lockres->l_blocking = DLM_LOCK_NL;
916                 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
917         }
918         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
919 }
920
921 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
922 {
923         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
924         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
925
926         /* Convert from RO to EX doesn't really need anything as our
927          * information is already up to data. Convert from NL to
928          * *anything* however should mark ourselves as needing an
929          * update */
930         if (lockres->l_level == DLM_LOCK_NL &&
931             lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
932                 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
933
934         lockres->l_level = lockres->l_requested;
935
936         /*
937          * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing
938          * the OCFS2_LOCK_BUSY flag to prevent the dc thread from
939          * downconverting the lock before the upconvert has fully completed.
940          * Do not prevent the dc thread from downconverting if NONBLOCK lock
941          * had already returned.
942          */
943         if (!(lockres->l_flags & OCFS2_LOCK_NONBLOCK_FINISHED))
944                 lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
945         else
946                 lockres_clear_flags(lockres, OCFS2_LOCK_NONBLOCK_FINISHED);
947
948         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
949 }
950
951 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
952 {
953         BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY)));
954         BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
955
956         if (lockres->l_requested > DLM_LOCK_NL &&
957             !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
958             lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
959                 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
960
961         lockres->l_level = lockres->l_requested;
962         lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
963         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
964 }
965
966 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
967                                      int level)
968 {
969         int needs_downconvert = 0;
970
971         assert_spin_locked(&lockres->l_lock);
972
973         if (level > lockres->l_blocking) {
974                 /* only schedule a downconvert if we haven't already scheduled
975                  * one that goes low enough to satisfy the level we're
976                  * blocking.  this also catches the case where we get
977                  * duplicate BASTs */
978                 if (ocfs2_highest_compat_lock_level(level) <
979                     ocfs2_highest_compat_lock_level(lockres->l_blocking))
980                         needs_downconvert = 1;
981
982                 lockres->l_blocking = level;
983         }
984
985         mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n",
986              lockres->l_name, level, lockres->l_level, lockres->l_blocking,
987              needs_downconvert);
988
989         if (needs_downconvert)
990                 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
991         mlog(0, "needs_downconvert = %d\n", needs_downconvert);
992         return needs_downconvert;
993 }
994
995 /*
996  * OCFS2_LOCK_PENDING and l_pending_gen.
997  *
998  * Why does OCFS2_LOCK_PENDING exist?  To close a race between setting
999  * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock().  See ocfs2_unblock_lock()
1000  * for more details on the race.
1001  *
1002  * OCFS2_LOCK_PENDING closes the race quite nicely.  However, it introduces
1003  * a race on itself.  In o2dlm, we can get the ast before ocfs2_dlm_lock()
1004  * returns.  The ast clears OCFS2_LOCK_BUSY, and must therefore clear
1005  * OCFS2_LOCK_PENDING at the same time.  When ocfs2_dlm_lock() returns,
1006  * the caller is going to try to clear PENDING again.  If nothing else is
1007  * happening, __lockres_clear_pending() sees PENDING is unset and does
1008  * nothing.
1009  *
1010  * But what if another path (eg downconvert thread) has just started a
1011  * new locking action?  The other path has re-set PENDING.  Our path
1012  * cannot clear PENDING, because that will re-open the original race
1013  * window.
1014  *
1015  * [Example]
1016  *
1017  * ocfs2_meta_lock()
1018  *  ocfs2_cluster_lock()
1019  *   set BUSY
1020  *   set PENDING
1021  *   drop l_lock
1022  *   ocfs2_dlm_lock()
1023  *    ocfs2_locking_ast()               ocfs2_downconvert_thread()
1024  *     clear PENDING                     ocfs2_unblock_lock()
1025  *                                        take_l_lock
1026  *                                        !BUSY
1027  *                                        ocfs2_prepare_downconvert()
1028  *                                         set BUSY
1029  *                                         set PENDING
1030  *                                        drop l_lock
1031  *   take l_lock
1032  *   clear PENDING
1033  *   drop l_lock
1034  *                      <window>
1035  *                                        ocfs2_dlm_lock()
1036  *
1037  * So as you can see, we now have a window where l_lock is not held,
1038  * PENDING is not set, and ocfs2_dlm_lock() has not been called.
1039  *
1040  * The core problem is that ocfs2_cluster_lock() has cleared the PENDING
1041  * set by ocfs2_prepare_downconvert().  That wasn't nice.
1042  *
1043  * To solve this we introduce l_pending_gen.  A call to
1044  * lockres_clear_pending() will only do so when it is passed a generation
1045  * number that matches the lockres.  lockres_set_pending() will return the
1046  * current generation number.  When ocfs2_cluster_lock() goes to clear
1047  * PENDING, it passes the generation it got from set_pending().  In our
1048  * example above, the generation numbers will *not* match.  Thus,
1049  * ocfs2_cluster_lock() will not clear the PENDING set by
1050  * ocfs2_prepare_downconvert().
1051  */
1052
1053 /* Unlocked version for ocfs2_locking_ast() */
1054 static void __lockres_clear_pending(struct ocfs2_lock_res *lockres,
1055                                     unsigned int generation,
1056                                     struct ocfs2_super *osb)
1057 {
1058         assert_spin_locked(&lockres->l_lock);
1059
1060         /*
1061          * The ast and locking functions can race us here.  The winner
1062          * will clear pending, the loser will not.
1063          */
1064         if (!(lockres->l_flags & OCFS2_LOCK_PENDING) ||
1065             (lockres->l_pending_gen != generation))
1066                 return;
1067
1068         lockres_clear_flags(lockres, OCFS2_LOCK_PENDING);
1069         lockres->l_pending_gen++;
1070
1071         /*
1072          * The downconvert thread may have skipped us because we
1073          * were PENDING.  Wake it up.
1074          */
1075         if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
1076                 ocfs2_wake_downconvert_thread(osb);
1077 }
1078
1079 /* Locked version for callers of ocfs2_dlm_lock() */
1080 static void lockres_clear_pending(struct ocfs2_lock_res *lockres,
1081                                   unsigned int generation,
1082                                   struct ocfs2_super *osb)
1083 {
1084         unsigned long flags;
1085
1086         spin_lock_irqsave(&lockres->l_lock, flags);
1087         __lockres_clear_pending(lockres, generation, osb);
1088         spin_unlock_irqrestore(&lockres->l_lock, flags);
1089 }
1090
1091 static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
1092 {
1093         assert_spin_locked(&lockres->l_lock);
1094         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
1095
1096         lockres_or_flags(lockres, OCFS2_LOCK_PENDING);
1097
1098         return lockres->l_pending_gen;
1099 }
1100
1101 static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level)
1102 {
1103         struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1104         struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1105         int needs_downconvert;
1106         unsigned long flags;
1107
1108         BUG_ON(level <= DLM_LOCK_NL);
1109
1110         mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, "
1111              "type %s\n", lockres->l_name, level, lockres->l_level,
1112              ocfs2_lock_type_string(lockres->l_type));
1113
1114         /*
1115          * We can skip the bast for locks which don't enable caching -
1116          * they'll be dropped at the earliest possible time anyway.
1117          */
1118         if (lockres->l_flags & OCFS2_LOCK_NOCACHE)
1119                 return;
1120
1121         spin_lock_irqsave(&lockres->l_lock, flags);
1122         needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
1123         if (needs_downconvert)
1124                 ocfs2_schedule_blocked_lock(osb, lockres);
1125         spin_unlock_irqrestore(&lockres->l_lock, flags);
1126
1127         wake_up(&lockres->l_event);
1128
1129         ocfs2_wake_downconvert_thread(osb);
1130 }
1131
1132 static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb)
1133 {
1134         struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1135         struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1136         unsigned long flags;
1137         int status;
1138
1139         spin_lock_irqsave(&lockres->l_lock, flags);
1140
1141         status = ocfs2_dlm_lock_status(&lockres->l_lksb);
1142
1143         if (status == -EAGAIN) {
1144                 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1145                 goto out;
1146         }
1147
1148         if (status) {
1149                 mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n",
1150                      lockres->l_name, status);
1151                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1152                 return;
1153         }
1154
1155         mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, "
1156              "level %d => %d\n", lockres->l_name, lockres->l_action,
1157              lockres->l_unlock_action, lockres->l_level, lockres->l_requested);
1158
1159         switch(lockres->l_action) {
1160         case OCFS2_AST_ATTACH:
1161                 ocfs2_generic_handle_attach_action(lockres);
1162                 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
1163                 break;
1164         case OCFS2_AST_CONVERT:
1165                 ocfs2_generic_handle_convert_action(lockres);
1166                 break;
1167         case OCFS2_AST_DOWNCONVERT:
1168                 ocfs2_generic_handle_downconvert_action(lockres);
1169                 break;
1170         default:
1171                 mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, "
1172                      "flags 0x%lx, unlock: %u\n",
1173                      lockres->l_name, lockres->l_action, lockres->l_flags,
1174                      lockres->l_unlock_action);
1175                 BUG();
1176         }
1177 out:
1178         /* set it to something invalid so if we get called again we
1179          * can catch it. */
1180         lockres->l_action = OCFS2_AST_INVALID;
1181
1182         /* Did we try to cancel this lock?  Clear that state */
1183         if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT)
1184                 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1185
1186         /*
1187          * We may have beaten the locking functions here.  We certainly
1188          * know that dlm_lock() has been called :-)
1189          * Because we can't have two lock calls in flight at once, we
1190          * can use lockres->l_pending_gen.
1191          */
1192         __lockres_clear_pending(lockres, lockres->l_pending_gen,  osb);
1193
1194         wake_up(&lockres->l_event);
1195         spin_unlock_irqrestore(&lockres->l_lock, flags);
1196 }
1197
1198 static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error)
1199 {
1200         struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1201         unsigned long flags;
1202
1203         mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n",
1204              lockres->l_name, lockres->l_unlock_action);
1205
1206         spin_lock_irqsave(&lockres->l_lock, flags);
1207         if (error) {
1208                 mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
1209                      "unlock_action %d\n", error, lockres->l_name,
1210                      lockres->l_unlock_action);
1211                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1212                 return;
1213         }
1214
1215         switch(lockres->l_unlock_action) {
1216         case OCFS2_UNLOCK_CANCEL_CONVERT:
1217                 mlog(0, "Cancel convert success for %s\n", lockres->l_name);
1218                 lockres->l_action = OCFS2_AST_INVALID;
1219                 /* Downconvert thread may have requeued this lock, we
1220                  * need to wake it. */
1221                 if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
1222                         ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
1223                 break;
1224         case OCFS2_UNLOCK_DROP_LOCK:
1225                 lockres->l_level = DLM_LOCK_IV;
1226                 break;
1227         default:
1228                 BUG();
1229         }
1230
1231         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1232         lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1233         wake_up(&lockres->l_event);
1234         spin_unlock_irqrestore(&lockres->l_lock, flags);
1235 }
1236
1237 /*
1238  * This is the filesystem locking protocol.  It provides the lock handling
1239  * hooks for the underlying DLM.  It has a maximum version number.
1240  * The version number allows interoperability with systems running at
1241  * the same major number and an equal or smaller minor number.
1242  *
1243  * Whenever the filesystem does new things with locks (adds or removes a
1244  * lock, orders them differently, does different things underneath a lock),
1245  * the version must be changed.  The protocol is negotiated when joining
1246  * the dlm domain.  A node may join the domain if its major version is
1247  * identical to all other nodes and its minor version is greater than
1248  * or equal to all other nodes.  When its minor version is greater than
1249  * the other nodes, it will run at the minor version specified by the
1250  * other nodes.
1251  *
1252  * If a locking change is made that will not be compatible with older
1253  * versions, the major number must be increased and the minor version set
1254  * to zero.  If a change merely adds a behavior that can be disabled when
1255  * speaking to older versions, the minor version must be increased.  If a
1256  * change adds a fully backwards compatible change (eg, LVB changes that
1257  * are just ignored by older versions), the version does not need to be
1258  * updated.
1259  */
1260 static struct ocfs2_locking_protocol lproto = {
1261         .lp_max_version = {
1262                 .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
1263                 .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
1264         },
1265         .lp_lock_ast            = ocfs2_locking_ast,
1266         .lp_blocking_ast        = ocfs2_blocking_ast,
1267         .lp_unlock_ast          = ocfs2_unlock_ast,
1268 };
1269
1270 void ocfs2_set_locking_protocol(void)
1271 {
1272         ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version);
1273 }
1274
1275 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
1276                                                 int convert)
1277 {
1278         unsigned long flags;
1279
1280         spin_lock_irqsave(&lockres->l_lock, flags);
1281         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1282         lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
1283         if (convert)
1284                 lockres->l_action = OCFS2_AST_INVALID;
1285         else
1286                 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1287         spin_unlock_irqrestore(&lockres->l_lock, flags);
1288
1289         wake_up(&lockres->l_event);
1290 }
1291
1292 /* Note: If we detect another process working on the lock (i.e.,
1293  * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
1294  * to do the right thing in that case.
1295  */
1296 static int ocfs2_lock_create(struct ocfs2_super *osb,
1297                              struct ocfs2_lock_res *lockres,
1298                              int level,
1299                              u32 dlm_flags)
1300 {
1301         int ret = 0;
1302         unsigned long flags;
1303         unsigned int gen;
1304
1305         mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level,
1306              dlm_flags);
1307
1308         spin_lock_irqsave(&lockres->l_lock, flags);
1309         if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
1310             (lockres->l_flags & OCFS2_LOCK_BUSY)) {
1311                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1312                 goto bail;
1313         }
1314
1315         lockres->l_action = OCFS2_AST_ATTACH;
1316         lockres->l_requested = level;
1317         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1318         gen = lockres_set_pending(lockres);
1319         spin_unlock_irqrestore(&lockres->l_lock, flags);
1320
1321         ret = ocfs2_dlm_lock(osb->cconn,
1322                              level,
1323                              &lockres->l_lksb,
1324                              dlm_flags,
1325                              lockres->l_name,
1326                              OCFS2_LOCK_ID_MAX_LEN - 1);
1327         lockres_clear_pending(lockres, gen, osb);
1328         if (ret) {
1329                 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
1330                 ocfs2_recover_from_dlm_error(lockres, 1);
1331         }
1332
1333         mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name);
1334
1335 bail:
1336         return ret;
1337 }
1338
1339 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
1340                                         int flag)
1341 {
1342         unsigned long flags;
1343         int ret;
1344
1345         spin_lock_irqsave(&lockres->l_lock, flags);
1346         ret = lockres->l_flags & flag;
1347         spin_unlock_irqrestore(&lockres->l_lock, flags);
1348
1349         return ret;
1350 }
1351
1352 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
1353
1354 {
1355         wait_event(lockres->l_event,
1356                    !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
1357 }
1358
1359 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
1360
1361 {
1362         wait_event(lockres->l_event,
1363                    !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
1364 }
1365
1366 /* predict what lock level we'll be dropping down to on behalf
1367  * of another node, and return true if the currently wanted
1368  * level will be compatible with it. */
1369 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
1370                                                      int wanted)
1371 {
1372         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
1373
1374         return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
1375 }
1376
1377 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
1378 {
1379         INIT_LIST_HEAD(&mw->mw_item);
1380         init_completion(&mw->mw_complete);
1381         ocfs2_init_start_time(mw);
1382 }
1383
1384 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
1385 {
1386         wait_for_completion(&mw->mw_complete);
1387         /* Re-arm the completion in case we want to wait on it again */
1388         reinit_completion(&mw->mw_complete);
1389         return mw->mw_status;
1390 }
1391
1392 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
1393                                     struct ocfs2_mask_waiter *mw,
1394                                     unsigned long mask,
1395                                     unsigned long goal)
1396 {
1397         BUG_ON(!list_empty(&mw->mw_item));
1398
1399         assert_spin_locked(&lockres->l_lock);
1400
1401         list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
1402         mw->mw_mask = mask;
1403         mw->mw_goal = goal;
1404 }
1405
1406 /* returns 0 if the mw that was removed was already satisfied, -EBUSY
1407  * if the mask still hadn't reached its goal */
1408 static int __lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
1409                                       struct ocfs2_mask_waiter *mw)
1410 {
1411         int ret = 0;
1412
1413         assert_spin_locked(&lockres->l_lock);
1414         if (!list_empty(&mw->mw_item)) {
1415                 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
1416                         ret = -EBUSY;
1417
1418                 list_del_init(&mw->mw_item);
1419                 init_completion(&mw->mw_complete);
1420         }
1421
1422         return ret;
1423 }
1424
1425 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
1426                                       struct ocfs2_mask_waiter *mw)
1427 {
1428         unsigned long flags;
1429         int ret = 0;
1430
1431         spin_lock_irqsave(&lockres->l_lock, flags);
1432         ret = __lockres_remove_mask_waiter(lockres, mw);
1433         spin_unlock_irqrestore(&lockres->l_lock, flags);
1434
1435         return ret;
1436
1437 }
1438
1439 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
1440                                              struct ocfs2_lock_res *lockres)
1441 {
1442         int ret;
1443
1444         ret = wait_for_completion_interruptible(&mw->mw_complete);
1445         if (ret)
1446                 lockres_remove_mask_waiter(lockres, mw);
1447         else
1448                 ret = mw->mw_status;
1449         /* Re-arm the completion in case we want to wait on it again */
1450         reinit_completion(&mw->mw_complete);
1451         return ret;
1452 }
1453
1454 static int __ocfs2_cluster_lock(struct ocfs2_super *osb,
1455                                 struct ocfs2_lock_res *lockres,
1456                                 int level,
1457                                 u32 lkm_flags,
1458                                 int arg_flags,
1459                                 int l_subclass,
1460                                 unsigned long caller_ip)
1461 {
1462         struct ocfs2_mask_waiter mw;
1463         int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
1464         int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
1465         unsigned long flags;
1466         unsigned int gen;
1467         int noqueue_attempted = 0;
1468         int dlm_locked = 0;
1469         int kick_dc = 0;
1470
1471         if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) {
1472                 mlog_errno(-EINVAL);
1473                 return -EINVAL;
1474         }
1475
1476         ocfs2_init_mask_waiter(&mw);
1477
1478         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
1479                 lkm_flags |= DLM_LKF_VALBLK;
1480
1481 again:
1482         wait = 0;
1483
1484         spin_lock_irqsave(&lockres->l_lock, flags);
1485
1486         if (catch_signals && signal_pending(current)) {
1487                 ret = -ERESTARTSYS;
1488                 goto unlock;
1489         }
1490
1491         mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
1492                         "Cluster lock called on freeing lockres %s! flags "
1493                         "0x%lx\n", lockres->l_name, lockres->l_flags);
1494
1495         /* We only compare against the currently granted level
1496          * here. If the lock is blocked waiting on a downconvert,
1497          * we'll get caught below. */
1498         if (lockres->l_flags & OCFS2_LOCK_BUSY &&
1499             level > lockres->l_level) {
1500                 /* is someone sitting in dlm_lock? If so, wait on
1501                  * them. */
1502                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1503                 wait = 1;
1504                 goto unlock;
1505         }
1506
1507         if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) {
1508                 /*
1509                  * We've upconverted. If the lock now has a level we can
1510                  * work with, we take it. If, however, the lock is not at the
1511                  * required level, we go thru the full cycle. One way this could
1512                  * happen is if a process requesting an upconvert to PR is
1513                  * closely followed by another requesting upconvert to an EX.
1514                  * If the process requesting EX lands here, we want it to
1515                  * continue attempting to upconvert and let the process
1516                  * requesting PR take the lock.
1517                  * If multiple processes request upconvert to PR, the first one
1518                  * here will take the lock. The others will have to go thru the
1519                  * OCFS2_LOCK_BLOCKED check to ensure that there is no pending
1520                  * downconvert request.
1521                  */
1522                 if (level <= lockres->l_level)
1523                         goto update_holders;
1524         }
1525
1526         if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
1527             !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
1528                 /* is the lock is currently blocked on behalf of
1529                  * another node */
1530                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
1531                 wait = 1;
1532                 goto unlock;
1533         }
1534
1535         if (level > lockres->l_level) {
1536                 if (noqueue_attempted > 0) {
1537                         ret = -EAGAIN;
1538                         goto unlock;
1539                 }
1540                 if (lkm_flags & DLM_LKF_NOQUEUE)
1541                         noqueue_attempted = 1;
1542
1543                 if (lockres->l_action != OCFS2_AST_INVALID)
1544                         mlog(ML_ERROR, "lockres %s has action %u pending\n",
1545                              lockres->l_name, lockres->l_action);
1546
1547                 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1548                         lockres->l_action = OCFS2_AST_ATTACH;
1549                         lkm_flags &= ~DLM_LKF_CONVERT;
1550                 } else {
1551                         lockres->l_action = OCFS2_AST_CONVERT;
1552                         lkm_flags |= DLM_LKF_CONVERT;
1553                 }
1554
1555                 lockres->l_requested = level;
1556                 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1557                 gen = lockres_set_pending(lockres);
1558                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1559
1560                 BUG_ON(level == DLM_LOCK_IV);
1561                 BUG_ON(level == DLM_LOCK_NL);
1562
1563                 mlog(ML_BASTS, "lockres %s, convert from %d to %d\n",
1564                      lockres->l_name, lockres->l_level, level);
1565
1566                 /* call dlm_lock to upgrade lock now */
1567                 ret = ocfs2_dlm_lock(osb->cconn,
1568                                      level,
1569                                      &lockres->l_lksb,
1570                                      lkm_flags,
1571                                      lockres->l_name,
1572                                      OCFS2_LOCK_ID_MAX_LEN - 1);
1573                 lockres_clear_pending(lockres, gen, osb);
1574                 if (ret) {
1575                         if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
1576                             (ret != -EAGAIN)) {
1577                                 ocfs2_log_dlm_error("ocfs2_dlm_lock",
1578                                                     ret, lockres);
1579                         }
1580                         ocfs2_recover_from_dlm_error(lockres, 1);
1581                         goto out;
1582                 }
1583                 dlm_locked = 1;
1584
1585                 mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n",
1586                      lockres->l_name);
1587
1588                 /* At this point we've gone inside the dlm and need to
1589                  * complete our work regardless. */
1590                 catch_signals = 0;
1591
1592                 /* wait for busy to clear and carry on */
1593                 goto again;
1594         }
1595
1596 update_holders:
1597         /* Ok, if we get here then we're good to go. */
1598         ocfs2_inc_holders(lockres, level);
1599
1600         ret = 0;
1601 unlock:
1602         lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
1603
1604         /* ocfs2_unblock_lock reques on seeing OCFS2_LOCK_UPCONVERT_FINISHING */
1605         kick_dc = (lockres->l_flags & OCFS2_LOCK_BLOCKED);
1606
1607         spin_unlock_irqrestore(&lockres->l_lock, flags);
1608         if (kick_dc)
1609                 ocfs2_wake_downconvert_thread(osb);
1610 out:
1611         /*
1612          * This is helping work around a lock inversion between the page lock
1613          * and dlm locks.  One path holds the page lock while calling aops
1614          * which block acquiring dlm locks.  The voting thread holds dlm
1615          * locks while acquiring page locks while down converting data locks.
1616          * This block is helping an aop path notice the inversion and back
1617          * off to unlock its page lock before trying the dlm lock again.
1618          */
1619         if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1620             mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1621                 wait = 0;
1622                 spin_lock_irqsave(&lockres->l_lock, flags);
1623                 if (__lockres_remove_mask_waiter(lockres, &mw)) {
1624                         if (dlm_locked)
1625                                 lockres_or_flags(lockres,
1626                                         OCFS2_LOCK_NONBLOCK_FINISHED);
1627                         spin_unlock_irqrestore(&lockres->l_lock, flags);
1628                         ret = -EAGAIN;
1629                 } else {
1630                         spin_unlock_irqrestore(&lockres->l_lock, flags);
1631                         goto again;
1632                 }
1633         }
1634         if (wait) {
1635                 ret = ocfs2_wait_for_mask(&mw);
1636                 if (ret == 0)
1637                         goto again;
1638                 mlog_errno(ret);
1639         }
1640         ocfs2_update_lock_stats(lockres, level, &mw, ret);
1641
1642 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1643         if (!ret && lockres->l_lockdep_map.key != NULL) {
1644                 if (level == DLM_LOCK_PR)
1645                         rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass,
1646                                 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
1647                                 caller_ip);
1648                 else
1649                         rwsem_acquire(&lockres->l_lockdep_map, l_subclass,
1650                                 !!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
1651                                 caller_ip);
1652         }
1653 #endif
1654         return ret;
1655 }
1656
1657 static inline int ocfs2_cluster_lock(struct ocfs2_super *osb,
1658                                      struct ocfs2_lock_res *lockres,
1659                                      int level,
1660                                      u32 lkm_flags,
1661                                      int arg_flags)
1662 {
1663         return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags,
1664                                     0, _RET_IP_);
1665 }
1666
1667
1668 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
1669                                    struct ocfs2_lock_res *lockres,
1670                                    int level,
1671                                    unsigned long caller_ip)
1672 {
1673         unsigned long flags;
1674
1675         spin_lock_irqsave(&lockres->l_lock, flags);
1676         ocfs2_dec_holders(lockres, level);
1677         ocfs2_downconvert_on_unlock(osb, lockres);
1678         spin_unlock_irqrestore(&lockres->l_lock, flags);
1679 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1680         if (lockres->l_lockdep_map.key != NULL)
1681                 rwsem_release(&lockres->l_lockdep_map, 1, caller_ip);
1682 #endif
1683 }
1684
1685 static int ocfs2_create_new_lock(struct ocfs2_super *osb,
1686                                  struct ocfs2_lock_res *lockres,
1687                                  int ex,
1688                                  int local)
1689 {
1690         int level =  ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1691         unsigned long flags;
1692         u32 lkm_flags = local ? DLM_LKF_LOCAL : 0;
1693
1694         spin_lock_irqsave(&lockres->l_lock, flags);
1695         BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1696         lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1697         spin_unlock_irqrestore(&lockres->l_lock, flags);
1698
1699         return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1700 }
1701
1702 /* Grants us an EX lock on the data and metadata resources, skipping
1703  * the normal cluster directory lookup. Use this ONLY on newly created
1704  * inodes which other nodes can't possibly see, and which haven't been
1705  * hashed in the inode hash yet. This can give us a good performance
1706  * increase as it'll skip the network broadcast normally associated
1707  * with creating a new lock resource. */
1708 int ocfs2_create_new_inode_locks(struct inode *inode)
1709 {
1710         int ret;
1711         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1712
1713         BUG_ON(!ocfs2_inode_is_new(inode));
1714
1715         mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1716
1717         /* NOTE: That we don't increment any of the holder counts, nor
1718          * do we add anything to a journal handle. Since this is
1719          * supposed to be a new inode which the cluster doesn't know
1720          * about yet, there is no need to.  As far as the LVB handling
1721          * is concerned, this is basically like acquiring an EX lock
1722          * on a resource which has an invalid one -- we'll set it
1723          * valid when we release the EX. */
1724
1725         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1726         if (ret) {
1727                 mlog_errno(ret);
1728                 goto bail;
1729         }
1730
1731         /*
1732          * We don't want to use DLM_LKF_LOCAL on a meta data lock as they
1733          * don't use a generation in their lock names.
1734          */
1735         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0);
1736         if (ret) {
1737                 mlog_errno(ret);
1738                 goto bail;
1739         }
1740
1741         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0);
1742         if (ret)
1743                 mlog_errno(ret);
1744
1745 bail:
1746         return ret;
1747 }
1748
1749 int ocfs2_rw_lock(struct inode *inode, int write)
1750 {
1751         int status, level;
1752         struct ocfs2_lock_res *lockres;
1753         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1754
1755         mlog(0, "inode %llu take %s RW lock\n",
1756              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1757              write ? "EXMODE" : "PRMODE");
1758
1759         if (ocfs2_mount_local(osb))
1760                 return 0;
1761
1762         lockres = &OCFS2_I(inode)->ip_rw_lockres;
1763
1764         level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1765
1766         status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
1767         if (status < 0)
1768                 mlog_errno(status);
1769
1770         return status;
1771 }
1772
1773 int ocfs2_try_rw_lock(struct inode *inode, int write)
1774 {
1775         int status, level;
1776         struct ocfs2_lock_res *lockres;
1777         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1778
1779         mlog(0, "inode %llu try to take %s RW lock\n",
1780              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1781              write ? "EXMODE" : "PRMODE");
1782
1783         if (ocfs2_mount_local(osb))
1784                 return 0;
1785
1786         lockres = &OCFS2_I(inode)->ip_rw_lockres;
1787
1788         level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1789
1790         status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0);
1791         return status;
1792 }
1793
1794 void ocfs2_rw_unlock(struct inode *inode, int write)
1795 {
1796         int level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1797         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1798         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1799
1800         mlog(0, "inode %llu drop %s RW lock\n",
1801              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1802              write ? "EXMODE" : "PRMODE");
1803
1804         if (!ocfs2_mount_local(osb))
1805                 ocfs2_cluster_unlock(osb, lockres, level);
1806 }
1807
1808 /*
1809  * ocfs2_open_lock always get PR mode lock.
1810  */
1811 int ocfs2_open_lock(struct inode *inode)
1812 {
1813         int status = 0;
1814         struct ocfs2_lock_res *lockres;
1815         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1816
1817         mlog(0, "inode %llu take PRMODE open lock\n",
1818              (unsigned long long)OCFS2_I(inode)->ip_blkno);
1819
1820         if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb))
1821                 goto out;
1822
1823         lockres = &OCFS2_I(inode)->ip_open_lockres;
1824
1825         status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_PR, 0, 0);
1826         if (status < 0)
1827                 mlog_errno(status);
1828
1829 out:
1830         return status;
1831 }
1832
1833 int ocfs2_try_open_lock(struct inode *inode, int write)
1834 {
1835         int status = 0, level;
1836         struct ocfs2_lock_res *lockres;
1837         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1838
1839         mlog(0, "inode %llu try to take %s open lock\n",
1840              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1841              write ? "EXMODE" : "PRMODE");
1842
1843         if (ocfs2_is_hard_readonly(osb)) {
1844                 if (write)
1845                         status = -EROFS;
1846                 goto out;
1847         }
1848
1849         if (ocfs2_mount_local(osb))
1850                 goto out;
1851
1852         lockres = &OCFS2_I(inode)->ip_open_lockres;
1853
1854         level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1855
1856         /*
1857          * The file system may already holding a PRMODE/EXMODE open lock.
1858          * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on
1859          * other nodes and the -EAGAIN will indicate to the caller that
1860          * this inode is still in use.
1861          */
1862         status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0);
1863
1864 out:
1865         return status;
1866 }
1867
1868 /*
1869  * ocfs2_open_unlock unlock PR and EX mode open locks.
1870  */
1871 void ocfs2_open_unlock(struct inode *inode)
1872 {
1873         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres;
1874         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1875
1876         mlog(0, "inode %llu drop open lock\n",
1877              (unsigned long long)OCFS2_I(inode)->ip_blkno);
1878
1879         if (ocfs2_mount_local(osb))
1880                 goto out;
1881
1882         if(lockres->l_ro_holders)
1883                 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_PR);
1884         if(lockres->l_ex_holders)
1885                 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
1886
1887 out:
1888         return;
1889 }
1890
1891 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres,
1892                                      int level)
1893 {
1894         int ret;
1895         struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1896         unsigned long flags;
1897         struct ocfs2_mask_waiter mw;
1898
1899         ocfs2_init_mask_waiter(&mw);
1900
1901 retry_cancel:
1902         spin_lock_irqsave(&lockres->l_lock, flags);
1903         if (lockres->l_flags & OCFS2_LOCK_BUSY) {
1904                 ret = ocfs2_prepare_cancel_convert(osb, lockres);
1905                 if (ret) {
1906                         spin_unlock_irqrestore(&lockres->l_lock, flags);
1907                         ret = ocfs2_cancel_convert(osb, lockres);
1908                         if (ret < 0) {
1909                                 mlog_errno(ret);
1910                                 goto out;
1911                         }
1912                         goto retry_cancel;
1913                 }
1914                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1915                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1916
1917                 ocfs2_wait_for_mask(&mw);
1918                 goto retry_cancel;
1919         }
1920
1921         ret = -ERESTARTSYS;
1922         /*
1923          * We may still have gotten the lock, in which case there's no
1924          * point to restarting the syscall.
1925          */
1926         if (lockres->l_level == level)
1927                 ret = 0;
1928
1929         mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret,
1930              lockres->l_flags, lockres->l_level, lockres->l_action);
1931
1932         spin_unlock_irqrestore(&lockres->l_lock, flags);
1933
1934 out:
1935         return ret;
1936 }
1937
1938 /*
1939  * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of
1940  * flock() calls. The locking approach this requires is sufficiently
1941  * different from all other cluster lock types that we implement a
1942  * separate path to the "low-level" dlm calls. In particular:
1943  *
1944  * - No optimization of lock levels is done - we take at exactly
1945  *   what's been requested.
1946  *
1947  * - No lock caching is employed. We immediately downconvert to
1948  *   no-lock at unlock time. This also means flock locks never go on
1949  *   the blocking list).
1950  *
1951  * - Since userspace can trivially deadlock itself with flock, we make
1952  *   sure to allow cancellation of a misbehaving applications flock()
1953  *   request.
1954  *
1955  * - Access to any flock lockres doesn't require concurrency, so we
1956  *   can simplify the code by requiring the caller to guarantee
1957  *   serialization of dlmglue flock calls.
1958  */
1959 int ocfs2_file_lock(struct file *file, int ex, int trylock)
1960 {
1961         int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1962         unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0;
1963         unsigned long flags;
1964         struct ocfs2_file_private *fp = file->private_data;
1965         struct ocfs2_lock_res *lockres = &fp->fp_flock;
1966         struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1967         struct ocfs2_mask_waiter mw;
1968
1969         ocfs2_init_mask_waiter(&mw);
1970
1971         if ((lockres->l_flags & OCFS2_LOCK_BUSY) ||
1972             (lockres->l_level > DLM_LOCK_NL)) {
1973                 mlog(ML_ERROR,
1974                      "File lock \"%s\" has busy or locked state: flags: 0x%lx, "
1975                      "level: %u\n", lockres->l_name, lockres->l_flags,
1976                      lockres->l_level);
1977                 return -EINVAL;
1978         }
1979
1980         spin_lock_irqsave(&lockres->l_lock, flags);
1981         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1982                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1983                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1984
1985                 /*
1986                  * Get the lock at NLMODE to start - that way we
1987                  * can cancel the upconvert request if need be.
1988                  */
1989                 ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0);
1990                 if (ret < 0) {
1991                         mlog_errno(ret);
1992                         goto out;
1993                 }
1994
1995                 ret = ocfs2_wait_for_mask(&mw);
1996                 if (ret) {
1997                         mlog_errno(ret);
1998                         goto out;
1999                 }
2000                 spin_lock_irqsave(&lockres->l_lock, flags);
2001         }
2002
2003         lockres->l_action = OCFS2_AST_CONVERT;
2004         lkm_flags |= DLM_LKF_CONVERT;
2005         lockres->l_requested = level;
2006         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2007
2008         lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
2009         spin_unlock_irqrestore(&lockres->l_lock, flags);
2010
2011         ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
2012                              lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1);
2013         if (ret) {
2014                 if (!trylock || (ret != -EAGAIN)) {
2015                         ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
2016                         ret = -EINVAL;
2017                 }
2018
2019                 ocfs2_recover_from_dlm_error(lockres, 1);
2020                 lockres_remove_mask_waiter(lockres, &mw);
2021                 goto out;
2022         }
2023
2024         ret = ocfs2_wait_for_mask_interruptible(&mw, lockres);
2025         if (ret == -ERESTARTSYS) {
2026                 /*
2027                  * Userspace can cause deadlock itself with
2028                  * flock(). Current behavior locally is to allow the
2029                  * deadlock, but abort the system call if a signal is
2030                  * received. We follow this example, otherwise a
2031                  * poorly written program could sit in kernel until
2032                  * reboot.
2033                  *
2034                  * Handling this is a bit more complicated for Ocfs2
2035                  * though. We can't exit this function with an
2036                  * outstanding lock request, so a cancel convert is
2037                  * required. We intentionally overwrite 'ret' - if the
2038                  * cancel fails and the lock was granted, it's easier
2039                  * to just bubble success back up to the user.
2040                  */
2041                 ret = ocfs2_flock_handle_signal(lockres, level);
2042         } else if (!ret && (level > lockres->l_level)) {
2043                 /* Trylock failed asynchronously */
2044                 BUG_ON(!trylock);
2045                 ret = -EAGAIN;
2046         }
2047
2048 out:
2049
2050         mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n",
2051              lockres->l_name, ex, trylock, ret);
2052         return ret;
2053 }
2054
2055 void ocfs2_file_unlock(struct file *file)
2056 {
2057         int ret;
2058         unsigned int gen;
2059         unsigned long flags;
2060         struct ocfs2_file_private *fp = file->private_data;
2061         struct ocfs2_lock_res *lockres = &fp->fp_flock;
2062         struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
2063         struct ocfs2_mask_waiter mw;
2064
2065         ocfs2_init_mask_waiter(&mw);
2066
2067         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED))
2068                 return;
2069
2070         if (lockres->l_level == DLM_LOCK_NL)
2071                 return;
2072
2073         mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n",
2074              lockres->l_name, lockres->l_flags, lockres->l_level,
2075              lockres->l_action);
2076
2077         spin_lock_irqsave(&lockres->l_lock, flags);
2078         /*
2079          * Fake a blocking ast for the downconvert code.
2080          */
2081         lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
2082         lockres->l_blocking = DLM_LOCK_EX;
2083
2084         gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL);
2085         lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
2086         spin_unlock_irqrestore(&lockres->l_lock, flags);
2087
2088         ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen);
2089         if (ret) {
2090                 mlog_errno(ret);
2091                 return;
2092         }
2093
2094         ret = ocfs2_wait_for_mask(&mw);
2095         if (ret)
2096                 mlog_errno(ret);
2097 }
2098
2099 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
2100                                         struct ocfs2_lock_res *lockres)
2101 {
2102         int kick = 0;
2103
2104         /* If we know that another node is waiting on our lock, kick
2105          * the downconvert thread * pre-emptively when we reach a release
2106          * condition. */
2107         if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
2108                 switch(lockres->l_blocking) {
2109                 case DLM_LOCK_EX:
2110                         if (!lockres->l_ex_holders && !lockres->l_ro_holders)
2111                                 kick = 1;
2112                         break;
2113                 case DLM_LOCK_PR:
2114                         if (!lockres->l_ex_holders)
2115                                 kick = 1;
2116                         break;
2117                 default:
2118                         BUG();
2119                 }
2120         }
2121
2122         if (kick)
2123                 ocfs2_wake_downconvert_thread(osb);
2124 }
2125
2126 #define OCFS2_SEC_BITS   34
2127 #define OCFS2_SEC_SHIFT  (64 - 34)
2128 #define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
2129
2130 /* LVB only has room for 64 bits of time here so we pack it for
2131  * now. */
2132 static u64 ocfs2_pack_timespec(struct timespec *spec)
2133 {
2134         u64 res;
2135         u64 sec = spec->tv_sec;
2136         u32 nsec = spec->tv_nsec;
2137
2138         res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
2139
2140         return res;
2141 }
2142
2143 /* Call this with the lockres locked. I am reasonably sure we don't
2144  * need ip_lock in this function as anyone who would be changing those
2145  * values is supposed to be blocked in ocfs2_inode_lock right now. */
2146 static void __ocfs2_stuff_meta_lvb(struct inode *inode)
2147 {
2148         struct ocfs2_inode_info *oi = OCFS2_I(inode);
2149         struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2150         struct ocfs2_meta_lvb *lvb;
2151         struct timespec ts;
2152
2153         lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2154
2155         /*
2156          * Invalidate the LVB of a deleted inode - this way other
2157          * nodes are forced to go to disk and discover the new inode
2158          * status.
2159          */
2160         if (oi->ip_flags & OCFS2_INODE_DELETED) {
2161                 lvb->lvb_version = 0;
2162                 goto out;
2163         }
2164
2165         lvb->lvb_version   = OCFS2_LVB_VERSION;
2166         lvb->lvb_isize     = cpu_to_be64(i_size_read(inode));
2167         lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
2168         lvb->lvb_iuid      = cpu_to_be32(i_uid_read(inode));
2169         lvb->lvb_igid      = cpu_to_be32(i_gid_read(inode));
2170         lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
2171         lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
2172         ts = timespec64_to_timespec(inode->i_atime);
2173         lvb->lvb_iatime_packed  =
2174                 cpu_to_be64(ocfs2_pack_timespec(&ts));
2175         ts = timespec64_to_timespec(inode->i_ctime);
2176         lvb->lvb_ictime_packed =
2177                 cpu_to_be64(ocfs2_pack_timespec(&ts));
2178         ts = timespec64_to_timespec(inode->i_mtime);
2179         lvb->lvb_imtime_packed =
2180                 cpu_to_be64(ocfs2_pack_timespec(&ts));
2181         lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
2182         lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features);
2183         lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
2184
2185 out:
2186         mlog_meta_lvb(0, lockres);
2187 }
2188
2189 static void ocfs2_unpack_timespec(struct timespec *spec,
2190                                   u64 packed_time)
2191 {
2192         spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
2193         spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
2194 }
2195
2196 static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
2197 {
2198         struct timespec ts;
2199         struct ocfs2_inode_info *oi = OCFS2_I(inode);
2200         struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2201         struct ocfs2_meta_lvb *lvb;
2202
2203         mlog_meta_lvb(0, lockres);
2204
2205         lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2206
2207         /* We're safe here without the lockres lock... */
2208         spin_lock(&oi->ip_lock);
2209         oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
2210         i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
2211
2212         oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
2213         oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures);
2214         ocfs2_set_inode_flags(inode);
2215
2216         /* fast-symlinks are a special case */
2217         if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
2218                 inode->i_blocks = 0;
2219         else
2220                 inode->i_blocks = ocfs2_inode_sector_count(inode);
2221
2222         i_uid_write(inode, be32_to_cpu(lvb->lvb_iuid));
2223         i_gid_write(inode, be32_to_cpu(lvb->lvb_igid));
2224         inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
2225         set_nlink(inode, be16_to_cpu(lvb->lvb_inlink));
2226         ocfs2_unpack_timespec(&ts,
2227                               be64_to_cpu(lvb->lvb_iatime_packed));
2228         inode->i_atime = timespec_to_timespec64(ts);
2229         ocfs2_unpack_timespec(&ts,
2230                               be64_to_cpu(lvb->lvb_imtime_packed));
2231         inode->i_mtime = timespec_to_timespec64(ts);
2232         ocfs2_unpack_timespec(&ts,
2233                               be64_to_cpu(lvb->lvb_ictime_packed));
2234         inode->i_ctime = timespec_to_timespec64(ts);
2235         spin_unlock(&oi->ip_lock);
2236 }
2237
2238 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
2239                                               struct ocfs2_lock_res *lockres)
2240 {
2241         struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2242
2243         if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)
2244             && lvb->lvb_version == OCFS2_LVB_VERSION
2245             && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
2246                 return 1;
2247         return 0;
2248 }
2249
2250 /* Determine whether a lock resource needs to be refreshed, and
2251  * arbitrate who gets to refresh it.
2252  *
2253  *   0 means no refresh needed.
2254  *
2255  *   > 0 means you need to refresh this and you MUST call
2256  *   ocfs2_complete_lock_res_refresh afterwards. */
2257 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
2258 {
2259         unsigned long flags;
2260         int status = 0;
2261
2262 refresh_check:
2263         spin_lock_irqsave(&lockres->l_lock, flags);
2264         if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
2265                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2266                 goto bail;
2267         }
2268
2269         if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
2270                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2271
2272                 ocfs2_wait_on_refreshing_lock(lockres);
2273                 goto refresh_check;
2274         }
2275
2276         /* Ok, I'll be the one to refresh this lock. */
2277         lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
2278         spin_unlock_irqrestore(&lockres->l_lock, flags);
2279
2280         status = 1;
2281 bail:
2282         mlog(0, "status %d\n", status);
2283         return status;
2284 }
2285
2286 /* If status is non zero, I'll mark it as not being in refresh
2287  * anymroe, but i won't clear the needs refresh flag. */
2288 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
2289                                                    int status)
2290 {
2291         unsigned long flags;
2292
2293         spin_lock_irqsave(&lockres->l_lock, flags);
2294         lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
2295         if (!status)
2296                 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
2297         spin_unlock_irqrestore(&lockres->l_lock, flags);
2298
2299         wake_up(&lockres->l_event);
2300 }
2301
2302 /* may or may not return a bh if it went to disk. */
2303 static int ocfs2_inode_lock_update(struct inode *inode,
2304                                   struct buffer_head **bh)
2305 {
2306         int status = 0;
2307         struct ocfs2_inode_info *oi = OCFS2_I(inode);
2308         struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2309         struct ocfs2_dinode *fe;
2310         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2311
2312         if (ocfs2_mount_local(osb))
2313                 goto bail;
2314
2315         spin_lock(&oi->ip_lock);
2316         if (oi->ip_flags & OCFS2_INODE_DELETED) {
2317                 mlog(0, "Orphaned inode %llu was deleted while we "
2318                      "were waiting on a lock. ip_flags = 0x%x\n",
2319                      (unsigned long long)oi->ip_blkno, oi->ip_flags);
2320                 spin_unlock(&oi->ip_lock);
2321                 status = -ENOENT;
2322                 goto bail;
2323         }
2324         spin_unlock(&oi->ip_lock);
2325
2326         if (!ocfs2_should_refresh_lock_res(lockres))
2327                 goto bail;
2328
2329         /* This will discard any caching information we might have had
2330          * for the inode metadata. */
2331         ocfs2_metadata_cache_purge(INODE_CACHE(inode));
2332
2333         ocfs2_extent_map_trunc(inode, 0);
2334
2335         if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
2336                 mlog(0, "Trusting LVB on inode %llu\n",
2337                      (unsigned long long)oi->ip_blkno);
2338                 ocfs2_refresh_inode_from_lvb(inode);
2339         } else {
2340                 /* Boo, we have to go to disk. */
2341                 /* read bh, cast, ocfs2_refresh_inode */
2342                 status = ocfs2_read_inode_block(inode, bh);
2343                 if (status < 0) {
2344                         mlog_errno(status);
2345                         goto bail_refresh;
2346                 }
2347                 fe = (struct ocfs2_dinode *) (*bh)->b_data;
2348
2349                 /* This is a good chance to make sure we're not
2350                  * locking an invalid object.  ocfs2_read_inode_block()
2351                  * already checked that the inode block is sane.
2352                  *
2353                  * We bug on a stale inode here because we checked
2354                  * above whether it was wiped from disk. The wiping
2355                  * node provides a guarantee that we receive that
2356                  * message and can mark the inode before dropping any
2357                  * locks associated with it. */
2358                 mlog_bug_on_msg(inode->i_generation !=
2359                                 le32_to_cpu(fe->i_generation),
2360                                 "Invalid dinode %llu disk generation: %u "
2361                                 "inode->i_generation: %u\n",
2362                                 (unsigned long long)oi->ip_blkno,
2363                                 le32_to_cpu(fe->i_generation),
2364                                 inode->i_generation);
2365                 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
2366                                 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
2367                                 "Stale dinode %llu dtime: %llu flags: 0x%x\n",
2368                                 (unsigned long long)oi->ip_blkno,
2369                                 (unsigned long long)le64_to_cpu(fe->i_dtime),
2370                                 le32_to_cpu(fe->i_flags));
2371
2372                 ocfs2_refresh_inode(inode, fe);
2373                 ocfs2_track_lock_refresh(lockres);
2374         }
2375
2376         status = 0;
2377 bail_refresh:
2378         ocfs2_complete_lock_res_refresh(lockres, status);
2379 bail:
2380         return status;
2381 }
2382
2383 static int ocfs2_assign_bh(struct inode *inode,
2384                            struct buffer_head **ret_bh,
2385                            struct buffer_head *passed_bh)
2386 {
2387         int status;
2388
2389         if (passed_bh) {
2390                 /* Ok, the update went to disk for us, use the
2391                  * returned bh. */
2392                 *ret_bh = passed_bh;
2393                 get_bh(*ret_bh);
2394
2395                 return 0;
2396         }
2397
2398         status = ocfs2_read_inode_block(inode, ret_bh);
2399         if (status < 0)
2400                 mlog_errno(status);
2401
2402         return status;
2403 }
2404
2405 /*
2406  * returns < 0 error if the callback will never be called, otherwise
2407  * the result of the lock will be communicated via the callback.
2408  */
2409 int ocfs2_inode_lock_full_nested(struct inode *inode,
2410                                  struct buffer_head **ret_bh,
2411                                  int ex,
2412                                  int arg_flags,
2413                                  int subclass)
2414 {
2415         int status, level, acquired;
2416         u32 dlm_flags;
2417         struct ocfs2_lock_res *lockres = NULL;
2418         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2419         struct buffer_head *local_bh = NULL;
2420
2421         mlog(0, "inode %llu, take %s META lock\n",
2422              (unsigned long long)OCFS2_I(inode)->ip_blkno,
2423              ex ? "EXMODE" : "PRMODE");
2424
2425         status = 0;
2426         acquired = 0;
2427         /* We'll allow faking a readonly metadata lock for
2428          * rodevices. */
2429         if (ocfs2_is_hard_readonly(osb)) {
2430                 if (ex)
2431                         status = -EROFS;
2432                 goto getbh;
2433         }
2434
2435         if ((arg_flags & OCFS2_META_LOCK_GETBH) ||
2436             ocfs2_mount_local(osb))
2437                 goto update;
2438
2439         if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2440                 ocfs2_wait_for_recovery(osb);
2441
2442         lockres = &OCFS2_I(inode)->ip_inode_lockres;
2443         level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2444         dlm_flags = 0;
2445         if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
2446                 dlm_flags |= DLM_LKF_NOQUEUE;
2447
2448         status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags,
2449                                       arg_flags, subclass, _RET_IP_);
2450         if (status < 0) {
2451                 if (status != -EAGAIN)
2452                         mlog_errno(status);
2453                 goto bail;
2454         }
2455
2456         /* Notify the error cleanup path to drop the cluster lock. */
2457         acquired = 1;
2458
2459         /* We wait twice because a node may have died while we were in
2460          * the lower dlm layers. The second time though, we've
2461          * committed to owning this lock so we don't allow signals to
2462          * abort the operation. */
2463         if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2464                 ocfs2_wait_for_recovery(osb);
2465
2466 update:
2467         /*
2468          * We only see this flag if we're being called from
2469          * ocfs2_read_locked_inode(). It means we're locking an inode
2470          * which hasn't been populated yet, so clear the refresh flag
2471          * and let the caller handle it.
2472          */
2473         if (inode->i_state & I_NEW) {
2474                 status = 0;
2475                 if (lockres)
2476                         ocfs2_complete_lock_res_refresh(lockres, 0);
2477                 goto bail;
2478         }
2479
2480         /* This is fun. The caller may want a bh back, or it may
2481          * not. ocfs2_inode_lock_update definitely wants one in, but
2482          * may or may not read one, depending on what's in the
2483          * LVB. The result of all of this is that we've *only* gone to
2484          * disk if we have to, so the complexity is worthwhile. */
2485         status = ocfs2_inode_lock_update(inode, &local_bh);
2486         if (status < 0) {
2487                 if (status != -ENOENT)
2488                         mlog_errno(status);
2489                 goto bail;
2490         }
2491 getbh:
2492         if (ret_bh) {
2493                 status = ocfs2_assign_bh(inode, ret_bh, local_bh);
2494                 if (status < 0) {
2495                         mlog_errno(status);
2496                         goto bail;
2497                 }
2498         }
2499
2500 bail:
2501         if (status < 0) {
2502                 if (ret_bh && (*ret_bh)) {
2503                         brelse(*ret_bh);
2504                         *ret_bh = NULL;
2505                 }
2506                 if (acquired)
2507                         ocfs2_inode_unlock(inode, ex);
2508         }
2509
2510         if (local_bh)
2511                 brelse(local_bh);
2512
2513         return status;
2514 }
2515
2516 /*
2517  * This is working around a lock inversion between tasks acquiring DLM
2518  * locks while holding a page lock and the downconvert thread which
2519  * blocks dlm lock acquiry while acquiring page locks.
2520  *
2521  * ** These _with_page variantes are only intended to be called from aop
2522  * methods that hold page locks and return a very specific *positive* error
2523  * code that aop methods pass up to the VFS -- test for errors with != 0. **
2524  *
2525  * The DLM is called such that it returns -EAGAIN if it would have
2526  * blocked waiting for the downconvert thread.  In that case we unlock
2527  * our page so the downconvert thread can make progress.  Once we've
2528  * done this we have to return AOP_TRUNCATED_PAGE so the aop method
2529  * that called us can bubble that back up into the VFS who will then
2530  * immediately retry the aop call.
2531  */
2532 int ocfs2_inode_lock_with_page(struct inode *inode,
2533                               struct buffer_head **ret_bh,
2534                               int ex,
2535                               struct page *page)
2536 {
2537         int ret;
2538
2539         ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
2540         if (ret == -EAGAIN) {
2541                 unlock_page(page);
2542                 /*
2543                  * If we can't get inode lock immediately, we should not return
2544                  * directly here, since this will lead to a softlockup problem.
2545                  * The method is to get a blocking lock and immediately unlock
2546                  * before returning, this can avoid CPU resource waste due to
2547                  * lots of retries, and benefits fairness in getting lock.
2548                  */
2549                 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0)
2550                         ocfs2_inode_unlock(inode, ex);
2551                 ret = AOP_TRUNCATED_PAGE;
2552         }
2553
2554         return ret;
2555 }
2556
2557 int ocfs2_inode_lock_atime(struct inode *inode,
2558                           struct vfsmount *vfsmnt,
2559                           int *level, int wait)
2560 {
2561         int ret;
2562
2563         if (wait)
2564                 ret = ocfs2_inode_lock(inode, NULL, 0);
2565         else
2566                 ret = ocfs2_try_inode_lock(inode, NULL, 0);
2567
2568         if (ret < 0) {
2569                 if (ret != -EAGAIN)
2570                         mlog_errno(ret);
2571                 return ret;
2572         }
2573
2574         /*
2575          * If we should update atime, we will get EX lock,
2576          * otherwise we just get PR lock.
2577          */
2578         if (ocfs2_should_update_atime(inode, vfsmnt)) {
2579                 struct buffer_head *bh = NULL;
2580
2581                 ocfs2_inode_unlock(inode, 0);
2582                 if (wait)
2583                         ret = ocfs2_inode_lock(inode, &bh, 1);
2584                 else
2585                         ret = ocfs2_try_inode_lock(inode, &bh, 1);
2586
2587                 if (ret < 0) {
2588                         if (ret != -EAGAIN)
2589                                 mlog_errno(ret);
2590                         return ret;
2591                 }
2592                 *level = 1;
2593                 if (ocfs2_should_update_atime(inode, vfsmnt))
2594                         ocfs2_update_inode_atime(inode, bh);
2595                 if (bh)
2596                         brelse(bh);
2597         } else
2598                 *level = 0;
2599
2600         return ret;
2601 }
2602
2603 void ocfs2_inode_unlock(struct inode *inode,
2604                        int ex)
2605 {
2606         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2607         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres;
2608         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2609
2610         mlog(0, "inode %llu drop %s META lock\n",
2611              (unsigned long long)OCFS2_I(inode)->ip_blkno,
2612              ex ? "EXMODE" : "PRMODE");
2613
2614         if (!ocfs2_is_hard_readonly(osb) &&
2615             !ocfs2_mount_local(osb))
2616                 ocfs2_cluster_unlock(osb, lockres, level);
2617 }
2618
2619 /*
2620  * This _tracker variantes are introduced to deal with the recursive cluster
2621  * locking issue. The idea is to keep track of a lock holder on the stack of
2622  * the current process. If there's a lock holder on the stack, we know the
2623  * task context is already protected by cluster locking. Currently, they're
2624  * used in some VFS entry routines.
2625  *
2626  * return < 0 on error, return == 0 if there's no lock holder on the stack
2627  * before this call, return == 1 if this call would be a recursive locking.
2628  * return == -1 if this lock attempt will cause an upgrade which is forbidden.
2629  *
2630  * When taking lock levels into account,we face some different situations.
2631  *
2632  * 1. no lock is held
2633  *    In this case, just lock the inode as requested and return 0
2634  *
2635  * 2. We are holding a lock
2636  *    For this situation, things diverges into several cases
2637  *
2638  *    wanted     holding             what to do
2639  *    ex                ex          see 2.1 below
2640  *    ex                pr          see 2.2 below
2641  *    pr                ex          see 2.1 below
2642  *    pr                pr          see 2.1 below
2643  *
2644  *    2.1 lock level that is been held is compatible
2645  *    with the wanted level, so no lock action will be tacken.
2646  *
2647  *    2.2 Otherwise, an upgrade is needed, but it is forbidden.
2648  *
2649  * Reason why upgrade within a process is forbidden is that
2650  * lock upgrade may cause dead lock. The following illustrates
2651  * how it happens.
2652  *
2653  *         thread on node1                             thread on node2
2654  * ocfs2_inode_lock_tracker(ex=0)
2655  *
2656  *                                <======   ocfs2_inode_lock_tracker(ex=1)
2657  *
2658  * ocfs2_inode_lock_tracker(ex=1)
2659  */
2660 int ocfs2_inode_lock_tracker(struct inode *inode,
2661                              struct buffer_head **ret_bh,
2662                              int ex,
2663                              struct ocfs2_lock_holder *oh)
2664 {
2665         int status = 0;
2666         struct ocfs2_lock_res *lockres;
2667         struct ocfs2_lock_holder *tmp_oh;
2668         struct pid *pid = task_pid(current);
2669
2670
2671         lockres = &OCFS2_I(inode)->ip_inode_lockres;
2672         tmp_oh = ocfs2_pid_holder(lockres, pid);
2673
2674         if (!tmp_oh) {
2675                 /*
2676                  * This corresponds to the case 1.
2677                  * We haven't got any lock before.
2678                  */
2679                 status = ocfs2_inode_lock_full(inode, ret_bh, ex, 0);
2680                 if (status < 0) {
2681                         if (status != -ENOENT)
2682                                 mlog_errno(status);
2683                         return status;
2684                 }
2685
2686                 oh->oh_ex = ex;
2687                 ocfs2_add_holder(lockres, oh);
2688                 return 0;
2689         }
2690
2691         if (unlikely(ex && !tmp_oh->oh_ex)) {
2692                 /*
2693                  * case 2.2 upgrade may cause dead lock, forbid it.
2694                  */
2695                 mlog(ML_ERROR, "Recursive locking is not permitted to "
2696                      "upgrade to EX level from PR level.\n");
2697                 dump_stack();
2698                 return -EINVAL;
2699         }
2700
2701         /*
2702          *  case 2.1 OCFS2_META_LOCK_GETBH flag make ocfs2_inode_lock_full.
2703          *  ignore the lock level and just update it.
2704          */
2705         if (ret_bh) {
2706                 status = ocfs2_inode_lock_full(inode, ret_bh, ex,
2707                                                OCFS2_META_LOCK_GETBH);
2708                 if (status < 0) {
2709                         if (status != -ENOENT)
2710                                 mlog_errno(status);
2711                         return status;
2712                 }
2713         }
2714         return tmp_oh ? 1 : 0;
2715 }
2716
2717 void ocfs2_inode_unlock_tracker(struct inode *inode,
2718                                 int ex,
2719                                 struct ocfs2_lock_holder *oh,
2720                                 int had_lock)
2721 {
2722         struct ocfs2_lock_res *lockres;
2723
2724         lockres = &OCFS2_I(inode)->ip_inode_lockres;
2725         /* had_lock means that the currect process already takes the cluster
2726          * lock previously.
2727          * If had_lock is 1, we have nothing to do here.
2728          * If had_lock is 0, we will release the lock.
2729          */
2730         if (!had_lock) {
2731                 ocfs2_inode_unlock(inode, oh->oh_ex);
2732                 ocfs2_remove_holder(lockres, oh);
2733         }
2734 }
2735
2736 int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno)
2737 {
2738         struct ocfs2_lock_res *lockres;
2739         struct ocfs2_orphan_scan_lvb *lvb;
2740         int status = 0;
2741
2742         if (ocfs2_is_hard_readonly(osb))
2743                 return -EROFS;
2744
2745         if (ocfs2_mount_local(osb))
2746                 return 0;
2747
2748         lockres = &osb->osb_orphan_scan.os_lockres;
2749         status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2750         if (status < 0)
2751                 return status;
2752
2753         lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2754         if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
2755             lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION)
2756                 *seqno = be32_to_cpu(lvb->lvb_os_seqno);
2757         else
2758                 *seqno = osb->osb_orphan_scan.os_seqno + 1;
2759
2760         return status;
2761 }
2762
2763 void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno)
2764 {
2765         struct ocfs2_lock_res *lockres;
2766         struct ocfs2_orphan_scan_lvb *lvb;
2767
2768         if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) {
2769                 lockres = &osb->osb_orphan_scan.os_lockres;
2770                 lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2771                 lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION;
2772                 lvb->lvb_os_seqno = cpu_to_be32(seqno);
2773                 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2774         }
2775 }
2776
2777 int ocfs2_super_lock(struct ocfs2_super *osb,
2778                      int ex)
2779 {
2780         int status = 0;
2781         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2782         struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2783
2784         if (ocfs2_is_hard_readonly(osb))
2785                 return -EROFS;
2786
2787         if (ocfs2_mount_local(osb))
2788                 goto bail;
2789
2790         status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
2791         if (status < 0) {
2792                 mlog_errno(status);
2793                 goto bail;
2794         }
2795
2796         /* The super block lock path is really in the best position to
2797          * know when resources covered by the lock need to be
2798          * refreshed, so we do it here. Of course, making sense of
2799          * everything is up to the caller :) */
2800         status = ocfs2_should_refresh_lock_res(lockres);
2801         if (status) {
2802                 status = ocfs2_refresh_slot_info(osb);
2803
2804                 ocfs2_complete_lock_res_refresh(lockres, status);
2805
2806                 if (status < 0) {
2807                         ocfs2_cluster_unlock(osb, lockres, level);
2808                         mlog_errno(status);
2809                 }
2810                 ocfs2_track_lock_refresh(lockres);
2811         }
2812 bail:
2813         return status;
2814 }
2815
2816 void ocfs2_super_unlock(struct ocfs2_super *osb,
2817                         int ex)
2818 {
2819         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2820         struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2821
2822         if (!ocfs2_mount_local(osb))
2823                 ocfs2_cluster_unlock(osb, lockres, level);
2824 }
2825
2826 int ocfs2_rename_lock(struct ocfs2_super *osb)
2827 {
2828         int status;
2829         struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2830
2831         if (ocfs2_is_hard_readonly(osb))
2832                 return -EROFS;
2833
2834         if (ocfs2_mount_local(osb))
2835                 return 0;
2836
2837         status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2838         if (status < 0)
2839                 mlog_errno(status);
2840
2841         return status;
2842 }
2843
2844 void ocfs2_rename_unlock(struct ocfs2_super *osb)
2845 {
2846         struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2847
2848         if (!ocfs2_mount_local(osb))
2849                 ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2850 }
2851
2852 int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex)
2853 {
2854         int status;
2855         struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
2856
2857         if (ocfs2_is_hard_readonly(osb))
2858                 return -EROFS;
2859
2860         if (ex)
2861                 down_write(&osb->nfs_sync_rwlock);
2862         else
2863                 down_read(&osb->nfs_sync_rwlock);
2864
2865         if (ocfs2_mount_local(osb))
2866                 return 0;
2867
2868         status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE,
2869                                     0, 0);
2870         if (status < 0) {
2871                 mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status);
2872
2873                 if (ex)
2874                         up_write(&osb->nfs_sync_rwlock);
2875                 else
2876                         up_read(&osb->nfs_sync_rwlock);
2877         }
2878
2879         return status;
2880 }
2881
2882 void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex)
2883 {
2884         struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
2885
2886         if (!ocfs2_mount_local(osb))
2887                 ocfs2_cluster_unlock(osb, lockres,
2888                                      ex ? LKM_EXMODE : LKM_PRMODE);
2889         if (ex)
2890                 up_write(&osb->nfs_sync_rwlock);
2891         else
2892                 up_read(&osb->nfs_sync_rwlock);
2893 }
2894
2895 int ocfs2_trim_fs_lock(struct ocfs2_super *osb,
2896                        struct ocfs2_trim_fs_info *info, int trylock)
2897 {
2898         int status;
2899         struct ocfs2_trim_fs_lvb *lvb;
2900         struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres;
2901
2902         if (info)
2903                 info->tf_valid = 0;
2904
2905         if (ocfs2_is_hard_readonly(osb))
2906                 return -EROFS;
2907
2908         if (ocfs2_mount_local(osb))
2909                 return 0;
2910
2911         status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX,
2912                                     trylock ? DLM_LKF_NOQUEUE : 0, 0);
2913         if (status < 0) {
2914                 if (status != -EAGAIN)
2915                         mlog_errno(status);
2916                 return status;
2917         }
2918
2919         if (info) {
2920                 lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2921                 if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
2922                     lvb->lvb_version == OCFS2_TRIMFS_LVB_VERSION) {
2923                         info->tf_valid = 1;
2924                         info->tf_success = lvb->lvb_success;
2925                         info->tf_nodenum = be32_to_cpu(lvb->lvb_nodenum);
2926                         info->tf_start = be64_to_cpu(lvb->lvb_start);
2927                         info->tf_len = be64_to_cpu(lvb->lvb_len);
2928                         info->tf_minlen = be64_to_cpu(lvb->lvb_minlen);
2929                         info->tf_trimlen = be64_to_cpu(lvb->lvb_trimlen);
2930                 }
2931         }
2932
2933         return status;
2934 }
2935
2936 void ocfs2_trim_fs_unlock(struct ocfs2_super *osb,
2937                           struct ocfs2_trim_fs_info *info)
2938 {
2939         struct ocfs2_trim_fs_lvb *lvb;
2940         struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres;
2941
2942         if (ocfs2_mount_local(osb))
2943                 return;
2944
2945         if (info) {
2946                 lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2947                 lvb->lvb_version = OCFS2_TRIMFS_LVB_VERSION;
2948                 lvb->lvb_success = info->tf_success;
2949                 lvb->lvb_nodenum = cpu_to_be32(info->tf_nodenum);
2950                 lvb->lvb_start = cpu_to_be64(info->tf_start);
2951                 lvb->lvb_len = cpu_to_be64(info->tf_len);
2952                 lvb->lvb_minlen = cpu_to_be64(info->tf_minlen);
2953                 lvb->lvb_trimlen = cpu_to_be64(info->tf_trimlen);
2954         }
2955
2956         ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2957 }
2958
2959 int ocfs2_dentry_lock(struct dentry *dentry, int ex)
2960 {
2961         int ret;
2962         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2963         struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2964         struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2965
2966         BUG_ON(!dl);
2967
2968         if (ocfs2_is_hard_readonly(osb)) {
2969                 if (ex)
2970                         return -EROFS;
2971                 return 0;
2972         }
2973
2974         if (ocfs2_mount_local(osb))
2975                 return 0;
2976
2977         ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
2978         if (ret < 0)
2979                 mlog_errno(ret);
2980
2981         return ret;
2982 }
2983
2984 void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
2985 {
2986         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2987         struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2988         struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2989
2990         if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb))
2991                 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
2992 }
2993
2994 /* Reference counting of the dlm debug structure. We want this because
2995  * open references on the debug inodes can live on after a mount, so
2996  * we can't rely on the ocfs2_super to always exist. */
2997 static void ocfs2_dlm_debug_free(struct kref *kref)
2998 {
2999         struct ocfs2_dlm_debug *dlm_debug;
3000
3001         dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
3002
3003         kfree(dlm_debug);
3004 }
3005
3006 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
3007 {
3008         if (dlm_debug)
3009                 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
3010 }
3011
3012 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
3013 {
3014         kref_get(&debug->d_refcnt);
3015 }
3016
3017 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
3018 {
3019         struct ocfs2_dlm_debug *dlm_debug;
3020
3021         dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
3022         if (!dlm_debug) {
3023                 mlog_errno(-ENOMEM);
3024                 goto out;
3025         }
3026
3027         kref_init(&dlm_debug->d_refcnt);
3028         INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
3029         dlm_debug->d_locking_state = NULL;
3030 out:
3031         return dlm_debug;
3032 }
3033
3034 /* Access to this is arbitrated for us via seq_file->sem. */
3035 struct ocfs2_dlm_seq_priv {
3036         struct ocfs2_dlm_debug *p_dlm_debug;
3037         struct ocfs2_lock_res p_iter_res;
3038         struct ocfs2_lock_res p_tmp_res;
3039 };
3040
3041 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
3042                                                  struct ocfs2_dlm_seq_priv *priv)
3043 {
3044         struct ocfs2_lock_res *iter, *ret = NULL;
3045         struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
3046
3047         assert_spin_locked(&ocfs2_dlm_tracking_lock);
3048
3049         list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
3050                 /* discover the head of the list */
3051                 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
3052                         mlog(0, "End of list found, %p\n", ret);
3053                         break;
3054                 }
3055
3056                 /* We track our "dummy" iteration lockres' by a NULL
3057                  * l_ops field. */
3058                 if (iter->l_ops != NULL) {
3059                         ret = iter;
3060                         break;
3061                 }
3062         }
3063
3064         return ret;
3065 }
3066
3067 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
3068 {
3069         struct ocfs2_dlm_seq_priv *priv = m->private;
3070         struct ocfs2_lock_res *iter;
3071
3072         spin_lock(&ocfs2_dlm_tracking_lock);
3073         iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
3074         if (iter) {
3075                 /* Since lockres' have the lifetime of their container
3076                  * (which can be inodes, ocfs2_supers, etc) we want to
3077                  * copy this out to a temporary lockres while still
3078                  * under the spinlock. Obviously after this we can't
3079                  * trust any pointers on the copy returned, but that's
3080                  * ok as the information we want isn't typically held
3081                  * in them. */
3082                 priv->p_tmp_res = *iter;
3083                 iter = &priv->p_tmp_res;
3084         }
3085         spin_unlock(&ocfs2_dlm_tracking_lock);
3086
3087         return iter;
3088 }
3089
3090 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
3091 {
3092 }
3093
3094 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
3095 {
3096         struct ocfs2_dlm_seq_priv *priv = m->private;
3097         struct ocfs2_lock_res *iter = v;
3098         struct ocfs2_lock_res *dummy = &priv->p_iter_res;
3099
3100         spin_lock(&ocfs2_dlm_tracking_lock);
3101         iter = ocfs2_dlm_next_res(iter, priv);
3102         list_del_init(&dummy->l_debug_list);
3103         if (iter) {
3104                 list_add(&dummy->l_debug_list, &iter->l_debug_list);
3105                 priv->p_tmp_res = *iter;
3106                 iter = &priv->p_tmp_res;
3107         }
3108         spin_unlock(&ocfs2_dlm_tracking_lock);
3109
3110         return iter;
3111 }
3112
3113 /*
3114  * Version is used by debugfs.ocfs2 to determine the format being used
3115  *
3116  * New in version 2
3117  *      - Lock stats printed
3118  * New in version 3
3119  *      - Max time in lock stats is in usecs (instead of nsecs)
3120  */
3121 #define OCFS2_DLM_DEBUG_STR_VERSION 3
3122 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
3123 {
3124         int i;
3125         char *lvb;
3126         struct ocfs2_lock_res *lockres = v;
3127
3128         if (!lockres)
3129                 return -EINVAL;
3130
3131         seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
3132
3133         if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
3134                 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
3135                            lockres->l_name,
3136                            (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
3137         else
3138                 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
3139
3140         seq_printf(m, "%d\t"
3141                    "0x%lx\t"
3142                    "0x%x\t"
3143                    "0x%x\t"
3144                    "%u\t"
3145                    "%u\t"
3146                    "%d\t"
3147                    "%d\t",
3148                    lockres->l_level,
3149                    lockres->l_flags,
3150                    lockres->l_action,
3151                    lockres->l_unlock_action,
3152                    lockres->l_ro_holders,
3153                    lockres->l_ex_holders,
3154                    lockres->l_requested,
3155                    lockres->l_blocking);
3156
3157         /* Dump the raw LVB */
3158         lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
3159         for(i = 0; i < DLM_LVB_LEN; i++)
3160                 seq_printf(m, "0x%x\t", lvb[i]);
3161
3162 #ifdef CONFIG_OCFS2_FS_STATS
3163 # define lock_num_prmode(_l)            ((_l)->l_lock_prmode.ls_gets)
3164 # define lock_num_exmode(_l)            ((_l)->l_lock_exmode.ls_gets)
3165 # define lock_num_prmode_failed(_l)     ((_l)->l_lock_prmode.ls_fail)
3166 # define lock_num_exmode_failed(_l)     ((_l)->l_lock_exmode.ls_fail)
3167 # define lock_total_prmode(_l)          ((_l)->l_lock_prmode.ls_total)
3168 # define lock_total_exmode(_l)          ((_l)->l_lock_exmode.ls_total)
3169 # define lock_max_prmode(_l)            ((_l)->l_lock_prmode.ls_max)
3170 # define lock_max_exmode(_l)            ((_l)->l_lock_exmode.ls_max)
3171 # define lock_refresh(_l)               ((_l)->l_lock_refresh)
3172 #else
3173 # define lock_num_prmode(_l)            (0)
3174 # define lock_num_exmode(_l)            (0)
3175 # define lock_num_prmode_failed(_l)     (0)
3176 # define lock_num_exmode_failed(_l)     (0)
3177 # define lock_total_prmode(_l)          (0ULL)
3178 # define lock_total_exmode(_l)          (0ULL)
3179 # define lock_max_prmode(_l)            (0)
3180 # define lock_max_exmode(_l)            (0)
3181 # define lock_refresh(_l)               (0)
3182 #endif
3183         /* The following seq_print was added in version 2 of this output */
3184         seq_printf(m, "%u\t"
3185                    "%u\t"
3186                    "%u\t"
3187                    "%u\t"
3188                    "%llu\t"
3189                    "%llu\t"
3190                    "%u\t"
3191                    "%u\t"
3192                    "%u\t",
3193                    lock_num_prmode(lockres),
3194                    lock_num_exmode(lockres),
3195                    lock_num_prmode_failed(lockres),
3196                    lock_num_exmode_failed(lockres),
3197                    lock_total_prmode(lockres),
3198                    lock_total_exmode(lockres),
3199                    lock_max_prmode(lockres),
3200                    lock_max_exmode(lockres),
3201                    lock_refresh(lockres));
3202
3203         /* End the line */
3204         seq_printf(m, "\n");
3205         return 0;
3206 }
3207
3208 static const struct seq_operations ocfs2_dlm_seq_ops = {
3209         .start =        ocfs2_dlm_seq_start,
3210         .stop =         ocfs2_dlm_seq_stop,
3211         .next =         ocfs2_dlm_seq_next,
3212         .show =         ocfs2_dlm_seq_show,
3213 };
3214
3215 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
3216 {
3217         struct seq_file *seq = file->private_data;
3218         struct ocfs2_dlm_seq_priv *priv = seq->private;
3219         struct ocfs2_lock_res *res = &priv->p_iter_res;
3220
3221         ocfs2_remove_lockres_tracking(res);
3222         ocfs2_put_dlm_debug(priv->p_dlm_debug);
3223         return seq_release_private(inode, file);
3224 }
3225
3226 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
3227 {
3228         struct ocfs2_dlm_seq_priv *priv;
3229         struct ocfs2_super *osb;
3230
3231         priv = __seq_open_private(file, &ocfs2_dlm_seq_ops, sizeof(*priv));
3232         if (!priv) {
3233                 mlog_errno(-ENOMEM);
3234                 return -ENOMEM;
3235         }
3236
3237         osb = inode->i_private;
3238         ocfs2_get_dlm_debug(osb->osb_dlm_debug);
3239         priv->p_dlm_debug = osb->osb_dlm_debug;
3240         INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
3241
3242         ocfs2_add_lockres_tracking(&priv->p_iter_res,
3243                                    priv->p_dlm_debug);
3244
3245         return 0;
3246 }
3247
3248 static const struct file_operations ocfs2_dlm_debug_fops = {
3249         .open =         ocfs2_dlm_debug_open,
3250         .release =      ocfs2_dlm_debug_release,
3251         .read =         seq_read,
3252         .llseek =       seq_lseek,
3253 };
3254
3255 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
3256 {
3257         int ret = 0;
3258         struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
3259
3260         dlm_debug->d_locking_state = debugfs_create_file("locking_state",
3261                                                          S_IFREG|S_IRUSR,
3262                                                          osb->osb_debug_root,
3263                                                          osb,
3264                                                          &ocfs2_dlm_debug_fops);
3265         if (!dlm_debug->d_locking_state) {
3266                 ret = -EINVAL;
3267                 mlog(ML_ERROR,
3268                      "Unable to create locking state debugfs file.\n");
3269                 goto out;
3270         }
3271
3272         ocfs2_get_dlm_debug(dlm_debug);
3273 out:
3274         return ret;
3275 }
3276
3277 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
3278 {
3279         struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
3280
3281         if (dlm_debug) {
3282                 debugfs_remove(dlm_debug->d_locking_state);
3283                 ocfs2_put_dlm_debug(dlm_debug);
3284         }
3285 }
3286
3287 int ocfs2_dlm_init(struct ocfs2_super *osb)
3288 {
3289         int status = 0;
3290         struct ocfs2_cluster_connection *conn = NULL;
3291
3292         if (ocfs2_mount_local(osb)) {
3293                 osb->node_num = 0;
3294                 goto local;
3295         }
3296
3297         status = ocfs2_dlm_init_debug(osb);
3298         if (status < 0) {
3299                 mlog_errno(status);
3300                 goto bail;
3301         }
3302
3303         /* launch downconvert thread */
3304         osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc-%s",
3305                         osb->uuid_str);
3306         if (IS_ERR(osb->dc_task)) {
3307                 status = PTR_ERR(osb->dc_task);
3308                 osb->dc_task = NULL;
3309                 mlog_errno(status);
3310                 goto bail;
3311         }
3312
3313         /* for now, uuid == domain */
3314         status = ocfs2_cluster_connect(osb->osb_cluster_stack,
3315                                        osb->osb_cluster_name,
3316                                        strlen(osb->osb_cluster_name),
3317                                        osb->uuid_str,
3318                                        strlen(osb->uuid_str),
3319                                        &lproto, ocfs2_do_node_down, osb,
3320                                        &conn);
3321         if (status) {
3322                 mlog_errno(status);
3323                 goto bail;
3324         }
3325
3326         status = ocfs2_cluster_this_node(conn, &osb->node_num);
3327         if (status < 0) {
3328                 mlog_errno(status);
3329                 mlog(ML_ERROR,
3330                      "could not find this host's node number\n");
3331                 ocfs2_cluster_disconnect(conn, 0);
3332                 goto bail;
3333         }
3334
3335 local:
3336         ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
3337         ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
3338         ocfs2_nfs_sync_lock_init(osb);
3339         ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb);
3340
3341         osb->cconn = conn;
3342 bail:
3343         if (status < 0) {
3344                 ocfs2_dlm_shutdown_debug(osb);
3345                 if (osb->dc_task)
3346                         kthread_stop(osb->dc_task);
3347         }
3348
3349         return status;
3350 }
3351
3352 void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
3353                         int hangup_pending)
3354 {
3355         ocfs2_drop_osb_locks(osb);
3356
3357         /*
3358          * Now that we have dropped all locks and ocfs2_dismount_volume()
3359          * has disabled recovery, the DLM won't be talking to us.  It's
3360          * safe to tear things down before disconnecting the cluster.
3361          */
3362
3363         if (osb->dc_task) {
3364                 kthread_stop(osb->dc_task);
3365                 osb->dc_task = NULL;
3366         }
3367
3368         ocfs2_lock_res_free(&osb->osb_super_lockres);
3369         ocfs2_lock_res_free(&osb->osb_rename_lockres);
3370         ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres);
3371         ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres);
3372
3373         ocfs2_cluster_disconnect(osb->cconn, hangup_pending);
3374         osb->cconn = NULL;
3375
3376         ocfs2_dlm_shutdown_debug(osb);
3377 }
3378
3379 static int ocfs2_drop_lock(struct ocfs2_super *osb,
3380                            struct ocfs2_lock_res *lockres)
3381 {
3382         int ret;
3383         unsigned long flags;
3384         u32 lkm_flags = 0;
3385
3386         /* We didn't get anywhere near actually using this lockres. */
3387         if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
3388                 goto out;
3389
3390         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
3391                 lkm_flags |= DLM_LKF_VALBLK;
3392
3393         spin_lock_irqsave(&lockres->l_lock, flags);
3394
3395         mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
3396                         "lockres %s, flags 0x%lx\n",
3397                         lockres->l_name, lockres->l_flags);
3398
3399         while (lockres->l_flags & OCFS2_LOCK_BUSY) {
3400                 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
3401                      "%u, unlock_action = %u\n",
3402                      lockres->l_name, lockres->l_flags, lockres->l_action,
3403                      lockres->l_unlock_action);
3404
3405                 spin_unlock_irqrestore(&lockres->l_lock, flags);
3406
3407                 /* XXX: Today we just wait on any busy
3408                  * locks... Perhaps we need to cancel converts in the
3409                  * future? */
3410                 ocfs2_wait_on_busy_lock(lockres);
3411
3412                 spin_lock_irqsave(&lockres->l_lock, flags);
3413         }
3414
3415         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3416                 if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
3417                     lockres->l_level == DLM_LOCK_EX &&
3418                     !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3419                         lockres->l_ops->set_lvb(lockres);
3420         }
3421
3422         if (lockres->l_flags & OCFS2_LOCK_BUSY)
3423                 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
3424                      lockres->l_name);
3425         if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
3426                 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
3427
3428         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
3429                 spin_unlock_irqrestore(&lockres->l_lock, flags);
3430                 goto out;
3431         }
3432
3433         lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
3434
3435         /* make sure we never get here while waiting for an ast to
3436          * fire. */
3437         BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
3438
3439         /* is this necessary? */
3440         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3441         lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
3442         spin_unlock_irqrestore(&lockres->l_lock, flags);
3443
3444         mlog(0, "lock %s\n", lockres->l_name);
3445
3446         ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags);
3447         if (ret) {
3448                 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3449                 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
3450                 ocfs2_dlm_dump_lksb(&lockres->l_lksb);
3451                 BUG();
3452         }
3453         mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n",
3454              lockres->l_name);
3455
3456         ocfs2_wait_on_busy_lock(lockres);
3457 out:
3458         return 0;
3459 }
3460
3461 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3462                                        struct ocfs2_lock_res *lockres);
3463
3464 /* Mark the lockres as being dropped. It will no longer be
3465  * queued if blocking, but we still may have to wait on it
3466  * being dequeued from the downconvert thread before we can consider
3467  * it safe to drop.
3468  *
3469  * You can *not* attempt to call cluster_lock on this lockres anymore. */
3470 void ocfs2_mark_lockres_freeing(struct ocfs2_super *osb,
3471                                 struct ocfs2_lock_res *lockres)
3472 {
3473         int status;
3474         struct ocfs2_mask_waiter mw;
3475         unsigned long flags, flags2;
3476
3477         ocfs2_init_mask_waiter(&mw);
3478
3479         spin_lock_irqsave(&lockres->l_lock, flags);
3480         lockres->l_flags |= OCFS2_LOCK_FREEING;
3481         if (lockres->l_flags & OCFS2_LOCK_QUEUED && current == osb->dc_task) {
3482                 /*
3483                  * We know the downconvert is queued but not in progress
3484                  * because we are the downconvert thread and processing
3485                  * different lock. So we can just remove the lock from the
3486                  * queue. This is not only an optimization but also a way
3487                  * to avoid the following deadlock:
3488                  *   ocfs2_dentry_post_unlock()
3489                  *     ocfs2_dentry_lock_put()
3490                  *       ocfs2_drop_dentry_lock()
3491                  *         iput()
3492                  *           ocfs2_evict_inode()
3493                  *             ocfs2_clear_inode()
3494                  *               ocfs2_mark_lockres_freeing()
3495                  *                 ... blocks waiting for OCFS2_LOCK_QUEUED
3496                  *                 since we are the downconvert thread which
3497                  *                 should clear the flag.
3498                  */
3499                 spin_unlock_irqrestore(&lockres->l_lock, flags);
3500                 spin_lock_irqsave(&osb->dc_task_lock, flags2);
3501                 list_del_init(&lockres->l_blocked_list);
3502                 osb->blocked_lock_count--;
3503                 spin_unlock_irqrestore(&osb->dc_task_lock, flags2);
3504                 /*
3505                  * Warn if we recurse into another post_unlock call.  Strictly
3506                  * speaking it isn't a problem but we need to be careful if
3507                  * that happens (stack overflow, deadlocks, ...) so warn if
3508                  * ocfs2 grows a path for which this can happen.
3509                  */
3510                 WARN_ON_ONCE(lockres->l_ops->post_unlock);
3511                 /* Since the lock is freeing we don't do much in the fn below */
3512                 ocfs2_process_blocked_lock(osb, lockres);
3513                 return;
3514         }
3515         while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
3516                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
3517                 spin_unlock_irqrestore(&lockres->l_lock, flags);
3518
3519                 mlog(0, "Waiting on lockres %s\n", lockres->l_name);
3520
3521                 status = ocfs2_wait_for_mask(&mw);
3522                 if (status)
3523                         mlog_errno(status);
3524
3525                 spin_lock_irqsave(&lockres->l_lock, flags);
3526         }
3527         spin_unlock_irqrestore(&lockres->l_lock, flags);
3528 }
3529
3530 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
3531                                struct ocfs2_lock_res *lockres)
3532 {
3533         int ret;
3534
3535         ocfs2_mark_lockres_freeing(osb, lockres);
3536         ret = ocfs2_drop_lock(osb, lockres);
3537         if (ret)
3538                 mlog_errno(ret);
3539 }
3540
3541 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
3542 {
3543         ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
3544         ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
3545         ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres);
3546         ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres);
3547 }
3548
3549 int ocfs2_drop_inode_locks(struct inode *inode)
3550 {
3551         int status, err;
3552
3553         /* No need to call ocfs2_mark_lockres_freeing here -
3554          * ocfs2_clear_inode has done it for us. */
3555
3556         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3557                               &OCFS2_I(inode)->ip_open_lockres);
3558         if (err < 0)
3559                 mlog_errno(err);
3560
3561         status = err;
3562
3563         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3564                               &OCFS2_I(inode)->ip_inode_lockres);
3565         if (err < 0)
3566                 mlog_errno(err);
3567         if (err < 0 && !status)
3568                 status = err;
3569
3570         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3571                               &OCFS2_I(inode)->ip_rw_lockres);
3572         if (err < 0)
3573                 mlog_errno(err);
3574         if (err < 0 && !status)
3575                 status = err;
3576
3577         return status;
3578 }
3579
3580 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
3581                                               int new_level)
3582 {
3583         assert_spin_locked(&lockres->l_lock);
3584
3585         BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
3586
3587         if (lockres->l_level <= new_level) {
3588                 mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, "
3589                      "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, "
3590                      "block %d, pgen %d\n", lockres->l_name, lockres->l_level,
3591                      new_level, list_empty(&lockres->l_blocked_list),
3592                      list_empty(&lockres->l_mask_waiters), lockres->l_type,
3593                      lockres->l_flags, lockres->l_ro_holders,
3594                      lockres->l_ex_holders, lockres->l_action,
3595                      lockres->l_unlock_action, lockres->l_requested,
3596                      lockres->l_blocking, lockres->l_pending_gen);
3597                 BUG();
3598         }
3599
3600         mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n",
3601              lockres->l_name, lockres->l_level, new_level, lockres->l_blocking);
3602
3603         lockres->l_action = OCFS2_AST_DOWNCONVERT;
3604         lockres->l_requested = new_level;
3605         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3606         return lockres_set_pending(lockres);
3607 }
3608
3609 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
3610                                   struct ocfs2_lock_res *lockres,
3611                                   int new_level,
3612                                   int lvb,
3613                                   unsigned int generation)
3614 {
3615         int ret;
3616         u32 dlm_flags = DLM_LKF_CONVERT;
3617
3618         mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name,
3619              lockres->l_level, new_level);
3620
3621         /*
3622          * On DLM_LKF_VALBLK, fsdlm behaves differently with o2cb. It always
3623          * expects DLM_LKF_VALBLK being set if the LKB has LVB, so that
3624          * we can recover correctly from node failure. Otherwise, we may get
3625          * invalid LVB in LKB, but without DLM_SBF_VALNOTVALID being set.
3626          */
3627         if (ocfs2_userspace_stack(osb) &&
3628             lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
3629                 lvb = 1;
3630
3631         if (lvb)
3632                 dlm_flags |= DLM_LKF_VALBLK;
3633
3634         ret = ocfs2_dlm_lock(osb->cconn,
3635                              new_level,
3636                              &lockres->l_lksb,
3637                              dlm_flags,
3638                              lockres->l_name,
3639                              OCFS2_LOCK_ID_MAX_LEN - 1);
3640         lockres_clear_pending(lockres, generation, osb);
3641         if (ret) {
3642                 ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
3643                 ocfs2_recover_from_dlm_error(lockres, 1);
3644                 goto bail;
3645         }
3646
3647         ret = 0;
3648 bail:
3649         return ret;
3650 }
3651
3652 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */
3653 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
3654                                         struct ocfs2_lock_res *lockres)
3655 {
3656         assert_spin_locked(&lockres->l_lock);
3657
3658         if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
3659                 /* If we're already trying to cancel a lock conversion
3660                  * then just drop the spinlock and allow the caller to
3661                  * requeue this lock. */
3662                 mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name);
3663                 return 0;
3664         }
3665
3666         /* were we in a convert when we got the bast fire? */
3667         BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
3668                lockres->l_action != OCFS2_AST_DOWNCONVERT);
3669         /* set things up for the unlockast to know to just
3670          * clear out the ast_action and unset busy, etc. */
3671         lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
3672
3673         mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
3674                         "lock %s, invalid flags: 0x%lx\n",
3675                         lockres->l_name, lockres->l_flags);
3676
3677         mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
3678
3679         return 1;
3680 }
3681
3682 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
3683                                 struct ocfs2_lock_res *lockres)
3684 {
3685         int ret;
3686
3687         ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
3688                                DLM_LKF_CANCEL);
3689         if (ret) {
3690                 ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3691                 ocfs2_recover_from_dlm_error(lockres, 0);
3692         }
3693
3694         mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
3695
3696         return ret;
3697 }
3698
3699 static int ocfs2_unblock_lock(struct ocfs2_super *osb,
3700                               struct ocfs2_lock_res *lockres,
3701                               struct ocfs2_unblock_ctl *ctl)
3702 {
3703         unsigned long flags;
3704         int blocking;
3705         int new_level;
3706         int level;
3707         int ret = 0;
3708         int set_lvb = 0;
3709         unsigned int gen;
3710
3711         spin_lock_irqsave(&lockres->l_lock, flags);
3712
3713 recheck:
3714         /*
3715          * Is it still blocking? If not, we have no more work to do.
3716          */
3717         if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) {
3718                 BUG_ON(lockres->l_blocking != DLM_LOCK_NL);
3719                 spin_unlock_irqrestore(&lockres->l_lock, flags);
3720                 ret = 0;
3721                 goto leave;
3722         }
3723
3724         if (lockres->l_flags & OCFS2_LOCK_BUSY) {
3725                 /* XXX
3726                  * This is a *big* race.  The OCFS2_LOCK_PENDING flag
3727                  * exists entirely for one reason - another thread has set
3728                  * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock().
3729                  *
3730                  * If we do ocfs2_cancel_convert() before the other thread
3731                  * calls dlm_lock(), our cancel will do nothing.  We will
3732                  * get no ast, and we will have no way of knowing the
3733                  * cancel failed.  Meanwhile, the other thread will call
3734                  * into dlm_lock() and wait...forever.
3735                  *
3736                  * Why forever?  Because another node has asked for the
3737                  * lock first; that's why we're here in unblock_lock().
3738                  *
3739                  * The solution is OCFS2_LOCK_PENDING.  When PENDING is
3740                  * set, we just requeue the unblock.  Only when the other
3741                  * thread has called dlm_lock() and cleared PENDING will
3742                  * we then cancel their request.
3743                  *
3744                  * All callers of dlm_lock() must set OCFS2_DLM_PENDING
3745                  * at the same time they set OCFS2_DLM_BUSY.  They must
3746                  * clear OCFS2_DLM_PENDING after dlm_lock() returns.
3747                  */
3748                 if (lockres->l_flags & OCFS2_LOCK_PENDING) {
3749                         mlog(ML_BASTS, "lockres %s, ReQ: Pending\n",
3750                              lockres->l_name);
3751                         goto leave_requeue;
3752                 }
3753
3754                 ctl->requeue = 1;
3755                 ret = ocfs2_prepare_cancel_convert(osb, lockres);
3756                 spin_unlock_irqrestore(&lockres->l_lock, flags);
3757                 if (ret) {
3758                         ret = ocfs2_cancel_convert(osb, lockres);
3759                         if (ret < 0)
3760                                 mlog_errno(ret);
3761                 }
3762                 goto leave;
3763         }
3764
3765         /*
3766          * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is
3767          * set when the ast is received for an upconvert just before the
3768          * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast
3769          * on the heels of the ast, we want to delay the downconvert just
3770          * enough to allow the up requestor to do its task. Because this
3771          * lock is in the blocked queue, the lock will be downconverted
3772          * as soon as the requestor is done with the lock.
3773          */
3774         if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING)
3775                 goto leave_requeue;
3776
3777         /*
3778          * How can we block and yet be at NL?  We were trying to upconvert
3779          * from NL and got canceled.  The code comes back here, and now
3780          * we notice and clear BLOCKING.
3781          */
3782         if (lockres->l_level == DLM_LOCK_NL) {
3783                 BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders);
3784                 mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name);
3785                 lockres->l_blocking = DLM_LOCK_NL;
3786                 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
3787                 spin_unlock_irqrestore(&lockres->l_lock, flags);
3788                 goto leave;
3789         }
3790
3791         /* if we're blocking an exclusive and we have *any* holders,
3792          * then requeue. */
3793         if ((lockres->l_blocking == DLM_LOCK_EX)
3794             && (lockres->l_ex_holders || lockres->l_ro_holders)) {
3795                 mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n",
3796                      lockres->l_name, lockres->l_ex_holders,
3797                      lockres->l_ro_holders);
3798                 goto leave_requeue;
3799         }
3800
3801         /* If it's a PR we're blocking, then only
3802          * requeue if we've got any EX holders */
3803         if (lockres->l_blocking == DLM_LOCK_PR &&
3804             lockres->l_ex_holders) {
3805                 mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n",
3806                      lockres->l_name, lockres->l_ex_holders);
3807                 goto leave_requeue;
3808         }
3809
3810         /*
3811          * Can we get a lock in this state if the holder counts are
3812          * zero? The meta data unblock code used to check this.
3813          */
3814         if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
3815             && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) {
3816                 mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n",
3817                      lockres->l_name);
3818                 goto leave_requeue;
3819         }
3820
3821         new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
3822
3823         if (lockres->l_ops->check_downconvert
3824             && !lockres->l_ops->check_downconvert(lockres, new_level)) {
3825                 mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n",
3826                      lockres->l_name);
3827                 goto leave_requeue;
3828         }
3829
3830         /* If we get here, then we know that there are no more
3831          * incompatible holders (and anyone asking for an incompatible
3832          * lock is blocked). We can now downconvert the lock */
3833         if (!lockres->l_ops->downconvert_worker)
3834                 goto downconvert;
3835
3836         /* Some lockres types want to do a bit of work before
3837          * downconverting a lock. Allow that here. The worker function
3838          * may sleep, so we save off a copy of what we're blocking as
3839          * it may change while we're not holding the spin lock. */
3840         blocking = lockres->l_blocking;
3841         level = lockres->l_level;
3842         spin_unlock_irqrestore(&lockres->l_lock, flags);
3843
3844         ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
3845
3846         if (ctl->unblock_action == UNBLOCK_STOP_POST) {
3847                 mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n",
3848                      lockres->l_name);
3849                 goto leave;
3850         }
3851
3852         spin_lock_irqsave(&lockres->l_lock, flags);
3853         if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) {
3854                 /* If this changed underneath us, then we can't drop
3855                  * it just yet. */
3856                 mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, "
3857                      "Recheck\n", lockres->l_name, blocking,
3858                      lockres->l_blocking, level, lockres->l_level);
3859                 goto recheck;
3860         }
3861
3862 downconvert:
3863         ctl->requeue = 0;
3864
3865         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3866                 if (lockres->l_level == DLM_LOCK_EX)
3867                         set_lvb = 1;
3868
3869                 /*
3870                  * We only set the lvb if the lock has been fully
3871                  * refreshed - otherwise we risk setting stale
3872                  * data. Otherwise, there's no need to actually clear
3873                  * out the lvb here as it's value is still valid.
3874                  */
3875                 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3876                         lockres->l_ops->set_lvb(lockres);
3877         }
3878
3879         gen = ocfs2_prepare_downconvert(lockres, new_level);
3880         spin_unlock_irqrestore(&lockres->l_lock, flags);
3881         ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb,
3882                                      gen);
3883
3884 leave:
3885         if (ret)
3886                 mlog_errno(ret);
3887         return ret;
3888
3889 leave_requeue:
3890         spin_unlock_irqrestore(&lockres->l_lock, flags);
3891         ctl->requeue = 1;
3892
3893         return 0;
3894 }
3895
3896 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
3897                                      int blocking)
3898 {
3899         struct inode *inode;
3900         struct address_space *mapping;
3901         struct ocfs2_inode_info *oi;
3902
3903         inode = ocfs2_lock_res_inode(lockres);
3904         mapping = inode->i_mapping;
3905
3906         if (S_ISDIR(inode->i_mode)) {
3907                 oi = OCFS2_I(inode);
3908                 oi->ip_dir_lock_gen++;
3909                 mlog(0, "generation: %u\n", oi->ip_dir_lock_gen);
3910                 goto out_forget;
3911         }
3912
3913         if (!S_ISREG(inode->i_mode))
3914                 goto out;
3915
3916         /*
3917          * We need this before the filemap_fdatawrite() so that it can
3918          * transfer the dirty bit from the PTE to the
3919          * page. Unfortunately this means that even for EX->PR
3920          * downconverts, we'll lose our mappings and have to build
3921          * them up again.
3922          */
3923         unmap_mapping_range(mapping, 0, 0, 0);
3924
3925         if (filemap_fdatawrite(mapping)) {
3926                 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
3927                      (unsigned long long)OCFS2_I(inode)->ip_blkno);
3928         }
3929         sync_mapping_buffers(mapping);
3930         if (blocking == DLM_LOCK_EX) {
3931                 truncate_inode_pages(mapping, 0);
3932         } else {
3933                 /* We only need to wait on the I/O if we're not also
3934                  * truncating pages because truncate_inode_pages waits
3935                  * for us above. We don't truncate pages if we're
3936                  * blocking anything < EXMODE because we want to keep
3937                  * them around in that case. */
3938                 filemap_fdatawait(mapping);
3939         }
3940
3941 out_forget:
3942         forget_all_cached_acls(inode);
3943
3944 out:
3945         return UNBLOCK_CONTINUE;
3946 }
3947
3948 static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci,
3949                                  struct ocfs2_lock_res *lockres,
3950                                  int new_level)
3951 {
3952         int checkpointed = ocfs2_ci_fully_checkpointed(ci);
3953
3954         BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR);
3955         BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed);
3956
3957         if (checkpointed)
3958                 return 1;
3959
3960         ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci)));
3961         return 0;
3962 }
3963
3964 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
3965                                         int new_level)
3966 {
3967         struct inode *inode = ocfs2_lock_res_inode(lockres);
3968
3969         return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level);
3970 }
3971
3972 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
3973 {
3974         struct inode *inode = ocfs2_lock_res_inode(lockres);
3975
3976         __ocfs2_stuff_meta_lvb(inode);
3977 }
3978
3979 /*
3980  * Does the final reference drop on our dentry lock. Right now this
3981  * happens in the downconvert thread, but we could choose to simplify the
3982  * dlmglue API and push these off to the ocfs2_wq in the future.
3983  */
3984 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
3985                                      struct ocfs2_lock_res *lockres)
3986 {
3987         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3988         ocfs2_dentry_lock_put(osb, dl);
3989 }
3990
3991 /*
3992  * d_delete() matching dentries before the lock downconvert.
3993  *
3994  * At this point, any process waiting to destroy the
3995  * dentry_lock due to last ref count is stopped by the
3996  * OCFS2_LOCK_QUEUED flag.
3997  *
3998  * We have two potential problems
3999  *
4000  * 1) If we do the last reference drop on our dentry_lock (via dput)
4001  *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
4002  *    the downconvert to finish. Instead we take an elevated
4003  *    reference and push the drop until after we've completed our
4004  *    unblock processing.
4005  *
4006  * 2) There might be another process with a final reference,
4007  *    waiting on us to finish processing. If this is the case, we
4008  *    detect it and exit out - there's no more dentries anyway.
4009  */
4010 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
4011                                        int blocking)
4012 {
4013         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
4014         struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
4015         struct dentry *dentry;
4016         unsigned long flags;
4017         int extra_ref = 0;
4018
4019         /*
4020          * This node is blocking another node from getting a read
4021          * lock. This happens when we've renamed within a
4022          * directory. We've forced the other nodes to d_delete(), but
4023          * we never actually dropped our lock because it's still
4024          * valid. The downconvert code will retain a PR for this node,
4025          * so there's no further work to do.
4026          */
4027         if (blocking == DLM_LOCK_PR)
4028                 return UNBLOCK_CONTINUE;
4029
4030         /*
4031          * Mark this inode as potentially orphaned. The code in
4032          * ocfs2_delete_inode() will figure out whether it actually
4033          * needs to be freed or not.
4034          */
4035         spin_lock(&oi->ip_lock);
4036         oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
4037         spin_unlock(&oi->ip_lock);
4038
4039         /*
4040          * Yuck. We need to make sure however that the check of
4041          * OCFS2_LOCK_FREEING and the extra reference are atomic with
4042          * respect to a reference decrement or the setting of that
4043          * flag.
4044          */
4045         spin_lock_irqsave(&lockres->l_lock, flags);
4046         spin_lock(&dentry_attach_lock);
4047         if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
4048             && dl->dl_count) {
4049                 dl->dl_count++;
4050                 extra_ref = 1;
4051         }
4052         spin_unlock(&dentry_attach_lock);
4053         spin_unlock_irqrestore(&lockres->l_lock, flags);
4054
4055         mlog(0, "extra_ref = %d\n", extra_ref);
4056
4057         /*
4058          * We have a process waiting on us in ocfs2_dentry_iput(),
4059          * which means we can't have any more outstanding
4060          * aliases. There's no need to do any more work.
4061          */
4062         if (!extra_ref)
4063                 return UNBLOCK_CONTINUE;
4064
4065         spin_lock(&dentry_attach_lock);
4066         while (1) {
4067                 dentry = ocfs2_find_local_alias(dl->dl_inode,
4068                                                 dl->dl_parent_blkno, 1);
4069                 if (!dentry)
4070                         break;
4071                 spin_unlock(&dentry_attach_lock);
4072
4073                 if (S_ISDIR(dl->dl_inode->i_mode))
4074                         shrink_dcache_parent(dentry);
4075
4076                 mlog(0, "d_delete(%pd);\n", dentry);
4077
4078                 /*
4079                  * The following dcache calls may do an
4080                  * iput(). Normally we don't want that from the
4081                  * downconverting thread, but in this case it's ok
4082                  * because the requesting node already has an
4083                  * exclusive lock on the inode, so it can't be queued
4084                  * for a downconvert.
4085                  */
4086                 d_delete(dentry);
4087                 dput(dentry);
4088
4089                 spin_lock(&dentry_attach_lock);
4090         }
4091         spin_unlock(&dentry_attach_lock);
4092
4093         /*
4094          * If we are the last holder of this dentry lock, there is no
4095          * reason to downconvert so skip straight to the unlock.
4096          */
4097         if (dl->dl_count == 1)
4098                 return UNBLOCK_STOP_POST;
4099
4100         return UNBLOCK_CONTINUE_POST;
4101 }
4102
4103 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
4104                                             int new_level)
4105 {
4106         struct ocfs2_refcount_tree *tree =
4107                                 ocfs2_lock_res_refcount_tree(lockres);
4108
4109         return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level);
4110 }
4111
4112 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
4113                                          int blocking)
4114 {
4115         struct ocfs2_refcount_tree *tree =
4116                                 ocfs2_lock_res_refcount_tree(lockres);
4117
4118         ocfs2_metadata_cache_purge(&tree->rf_ci);
4119
4120         return UNBLOCK_CONTINUE;
4121 }
4122
4123 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres)
4124 {
4125         struct ocfs2_qinfo_lvb *lvb;
4126         struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres);
4127         struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
4128                                             oinfo->dqi_gi.dqi_type);
4129
4130         lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
4131         lvb->lvb_version = OCFS2_QINFO_LVB_VERSION;
4132         lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace);
4133         lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace);
4134         lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms);
4135         lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks);
4136         lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk);
4137         lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry);
4138 }
4139
4140 void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex)
4141 {
4142         struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
4143         struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
4144         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
4145
4146         if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb))
4147                 ocfs2_cluster_unlock(osb, lockres, level);
4148 }
4149
4150 static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo)
4151 {
4152         struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
4153                                             oinfo->dqi_gi.dqi_type);
4154         struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
4155         struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
4156         struct buffer_head *bh = NULL;
4157         struct ocfs2_global_disk_dqinfo *gdinfo;
4158         int status = 0;
4159
4160         if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
4161             lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) {
4162                 info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace);
4163                 info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace);
4164                 oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms);
4165                 oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks);
4166                 oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk);
4167                 oinfo->dqi_gi.dqi_free_entry =
4168                                         be32_to_cpu(lvb->lvb_free_entry);
4169         } else {
4170                 status = ocfs2_read_quota_phys_block(oinfo->dqi_gqinode,
4171                                                      oinfo->dqi_giblk, &bh);
4172                 if (status) {
4173                         mlog_errno(status);
4174                         goto bail;
4175                 }
4176                 gdinfo = (struct ocfs2_global_disk_dqinfo *)
4177                                         (bh->b_data + OCFS2_GLOBAL_INFO_OFF);
4178                 info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace);
4179                 info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace);
4180                 oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms);
4181                 oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks);
4182                 oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk);
4183                 oinfo->dqi_gi.dqi_free_entry =
4184                                         le32_to_cpu(gdinfo->dqi_free_entry);
4185                 brelse(bh);
4186                 ocfs2_track_lock_refresh(lockres);
4187         }
4188
4189 bail:
4190         return status;
4191 }
4192
4193 /* Lock quota info, this function expects at least shared lock on the quota file
4194  * so that we can safely refresh quota info from disk. */
4195 int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex)
4196 {
4197         struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
4198         struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
4199         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
4200         int status = 0;
4201
4202         /* On RO devices, locking really isn't needed... */
4203         if (ocfs2_is_hard_readonly(osb)) {
4204                 if (ex)
4205                         status = -EROFS;
4206                 goto bail;
4207         }
4208         if (ocfs2_mount_local(osb))
4209                 goto bail;
4210
4211         status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
4212         if (status < 0) {
4213                 mlog_errno(status);
4214                 goto bail;
4215         }
4216         if (!ocfs2_should_refresh_lock_res(lockres))
4217                 goto bail;
4218         /* OK, we have the lock but we need to refresh the quota info */
4219         status = ocfs2_refresh_qinfo(oinfo);
4220         if (status)
4221                 ocfs2_qinfo_unlock(oinfo, ex);
4222         ocfs2_complete_lock_res_refresh(lockres, status);
4223 bail:
4224         return status;
4225 }
4226
4227 int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex)
4228 {
4229         int status;
4230         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
4231         struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
4232         struct ocfs2_super *osb = lockres->l_priv;
4233
4234
4235         if (ocfs2_is_hard_readonly(osb))
4236                 return -EROFS;
4237
4238         if (ocfs2_mount_local(osb))
4239                 return 0;
4240
4241         status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
4242         if (status < 0)
4243                 mlog_errno(status);
4244
4245         return status;
4246 }
4247
4248 void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex)
4249 {
4250         int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
4251         struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
4252         struct ocfs2_super *osb = lockres->l_priv;
4253
4254         if (!ocfs2_mount_local(osb))
4255                 ocfs2_cluster_unlock(osb, lockres, level);
4256 }
4257
4258 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
4259                                        struct ocfs2_lock_res *lockres)
4260 {
4261         int status;
4262         struct ocfs2_unblock_ctl ctl = {0, 0,};
4263         unsigned long flags;
4264
4265         /* Our reference to the lockres in this function can be
4266          * considered valid until we remove the OCFS2_LOCK_QUEUED
4267          * flag. */
4268
4269         BUG_ON(!lockres);
4270         BUG_ON(!lockres->l_ops);
4271
4272         mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name);
4273
4274         /* Detect whether a lock has been marked as going away while
4275          * the downconvert thread was processing other things. A lock can
4276          * still be marked with OCFS2_LOCK_FREEING after this check,
4277          * but short circuiting here will still save us some
4278          * performance. */
4279         spin_lock_irqsave(&lockres->l_lock, flags);
4280         if (lockres->l_flags & OCFS2_LOCK_FREEING)
4281                 goto unqueue;
4282         spin_unlock_irqrestore(&lockres->l_lock, flags);
4283
4284         status = ocfs2_unblock_lock(osb, lockres, &ctl);
4285         if (status < 0)
4286                 mlog_errno(status);
4287
4288         spin_lock_irqsave(&lockres->l_lock, flags);
4289 unqueue:
4290         if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
4291                 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
4292         } else
4293                 ocfs2_schedule_blocked_lock(osb, lockres);
4294
4295         mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name,
4296              ctl.requeue ? "yes" : "no");
4297         spin_unlock_irqrestore(&lockres->l_lock, flags);
4298
4299         if (ctl.unblock_action != UNBLOCK_CONTINUE
4300             && lockres->l_ops->post_unlock)
4301                 lockres->l_ops->post_unlock(osb, lockres);
4302 }
4303
4304 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
4305                                         struct ocfs2_lock_res *lockres)
4306 {
4307         unsigned long flags;
4308
4309         assert_spin_locked(&lockres->l_lock);
4310
4311         if (lockres->l_flags & OCFS2_LOCK_FREEING) {
4312                 /* Do not schedule a lock for downconvert when it's on
4313                  * the way to destruction - any nodes wanting access
4314                  * to the resource will get it soon. */
4315                 mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n",
4316                      lockres->l_name, lockres->l_flags);
4317                 return;
4318         }
4319
4320         lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
4321
4322         spin_lock_irqsave(&osb->dc_task_lock, flags);
4323         if (list_empty(&lockres->l_blocked_list)) {
4324                 list_add_tail(&lockres->l_blocked_list,
4325                               &osb->blocked_lock_list);
4326                 osb->blocked_lock_count++;
4327         }
4328         spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4329 }
4330
4331 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb)
4332 {
4333         unsigned long processed;
4334         unsigned long flags;
4335         struct ocfs2_lock_res *lockres;
4336
4337         spin_lock_irqsave(&osb->dc_task_lock, flags);
4338         /* grab this early so we know to try again if a state change and
4339          * wake happens part-way through our work  */
4340         osb->dc_work_sequence = osb->dc_wake_sequence;
4341
4342         processed = osb->blocked_lock_count;
4343         /*
4344          * blocked lock processing in this loop might call iput which can
4345          * remove items off osb->blocked_lock_list. Downconvert up to
4346          * 'processed' number of locks, but stop short if we had some
4347          * removed in ocfs2_mark_lockres_freeing when downconverting.
4348          */
4349         while (processed && !list_empty(&osb->blocked_lock_list)) {
4350                 lockres = list_entry(osb->blocked_lock_list.next,
4351                                      struct ocfs2_lock_res, l_blocked_list);
4352                 list_del_init(&lockres->l_blocked_list);
4353                 osb->blocked_lock_count--;
4354                 spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4355
4356                 BUG_ON(!processed);
4357                 processed--;
4358
4359                 ocfs2_process_blocked_lock(osb, lockres);
4360
4361                 spin_lock_irqsave(&osb->dc_task_lock, flags);
4362         }
4363         spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4364 }
4365
4366 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb)
4367 {
4368         int empty = 0;
4369         unsigned long flags;
4370
4371         spin_lock_irqsave(&osb->dc_task_lock, flags);
4372         if (list_empty(&osb->blocked_lock_list))
4373                 empty = 1;
4374
4375         spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4376         return empty;
4377 }
4378
4379 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb)
4380 {
4381         int should_wake = 0;
4382         unsigned long flags;
4383
4384         spin_lock_irqsave(&osb->dc_task_lock, flags);
4385         if (osb->dc_work_sequence != osb->dc_wake_sequence)
4386                 should_wake = 1;
4387         spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4388
4389         return should_wake;
4390 }
4391
4392 static int ocfs2_downconvert_thread(void *arg)
4393 {
4394         int status = 0;
4395         struct ocfs2_super *osb = arg;
4396
4397         /* only quit once we've been asked to stop and there is no more
4398          * work available */
4399         while (!(kthread_should_stop() &&
4400                 ocfs2_downconvert_thread_lists_empty(osb))) {
4401
4402                 wait_event_interruptible(osb->dc_event,
4403                                          ocfs2_downconvert_thread_should_wake(osb) ||
4404                                          kthread_should_stop());
4405
4406                 mlog(0, "downconvert_thread: awoken\n");
4407
4408                 ocfs2_downconvert_thread_do_work(osb);
4409         }
4410
4411         osb->dc_task = NULL;
4412         return status;
4413 }
4414
4415 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb)
4416 {
4417         unsigned long flags;
4418
4419         spin_lock_irqsave(&osb->dc_task_lock, flags);
4420         /* make sure the voting thread gets a swipe at whatever changes
4421          * the caller may have made to the voting state */
4422         osb->dc_wake_sequence++;
4423         spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4424         wake_up(&osb->dc_event);
4425 }