df1ecb8bfebf7071c9f7bc50893fade5f04e9675
[releases.git] / mds_client.c
1 // SPDX-License-Identifier: GPL-2.0
2 #include <linux/ceph/ceph_debug.h>
3
4 #include <linux/fs.h>
5 #include <linux/wait.h>
6 #include <linux/slab.h>
7 #include <linux/gfp.h>
8 #include <linux/sched.h>
9 #include <linux/debugfs.h>
10 #include <linux/seq_file.h>
11 #include <linux/ratelimit.h>
12 #include <linux/bits.h>
13 #include <linux/ktime.h>
14
15 #include "super.h"
16 #include "mds_client.h"
17
18 #include <linux/ceph/ceph_features.h>
19 #include <linux/ceph/messenger.h>
20 #include <linux/ceph/decode.h>
21 #include <linux/ceph/pagelist.h>
22 #include <linux/ceph/auth.h>
23 #include <linux/ceph/debugfs.h>
24
25 #define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)
26
27 /*
28  * A cluster of MDS (metadata server) daemons is responsible for
29  * managing the file system namespace (the directory hierarchy and
30  * inodes) and for coordinating shared access to storage.  Metadata is
31  * partitioning hierarchically across a number of servers, and that
32  * partition varies over time as the cluster adjusts the distribution
33  * in order to balance load.
34  *
35  * The MDS client is primarily responsible to managing synchronous
36  * metadata requests for operations like open, unlink, and so forth.
37  * If there is a MDS failure, we find out about it when we (possibly
38  * request and) receive a new MDS map, and can resubmit affected
39  * requests.
40  *
41  * For the most part, though, we take advantage of a lossless
42  * communications channel to the MDS, and do not need to worry about
43  * timing out or resubmitting requests.
44  *
45  * We maintain a stateful "session" with each MDS we interact with.
46  * Within each session, we sent periodic heartbeat messages to ensure
47  * any capabilities or leases we have been issues remain valid.  If
48  * the session times out and goes stale, our leases and capabilities
49  * are no longer valid.
50  */
51
52 struct ceph_reconnect_state {
53         struct ceph_mds_session *session;
54         int nr_caps, nr_realms;
55         struct ceph_pagelist *pagelist;
56         unsigned msg_version;
57         bool allow_multi;
58 };
59
60 static void __wake_requests(struct ceph_mds_client *mdsc,
61                             struct list_head *head);
62 static void ceph_cap_release_work(struct work_struct *work);
63 static void ceph_cap_reclaim_work(struct work_struct *work);
64
65 static const struct ceph_connection_operations mds_con_ops;
66
67
68 /*
69  * mds reply parsing
70  */
71
72 static int parse_reply_info_quota(void **p, void *end,
73                                   struct ceph_mds_reply_info_in *info)
74 {
75         u8 struct_v, struct_compat;
76         u32 struct_len;
77
78         ceph_decode_8_safe(p, end, struct_v, bad);
79         ceph_decode_8_safe(p, end, struct_compat, bad);
80         /* struct_v is expected to be >= 1. we only
81          * understand encoding with struct_compat == 1. */
82         if (!struct_v || struct_compat != 1)
83                 goto bad;
84         ceph_decode_32_safe(p, end, struct_len, bad);
85         ceph_decode_need(p, end, struct_len, bad);
86         end = *p + struct_len;
87         ceph_decode_64_safe(p, end, info->max_bytes, bad);
88         ceph_decode_64_safe(p, end, info->max_files, bad);
89         *p = end;
90         return 0;
91 bad:
92         return -EIO;
93 }
94
95 /*
96  * parse individual inode info
97  */
98 static int parse_reply_info_in(void **p, void *end,
99                                struct ceph_mds_reply_info_in *info,
100                                u64 features)
101 {
102         int err = 0;
103         u8 struct_v = 0;
104
105         if (features == (u64)-1) {
106                 u32 struct_len;
107                 u8 struct_compat;
108                 ceph_decode_8_safe(p, end, struct_v, bad);
109                 ceph_decode_8_safe(p, end, struct_compat, bad);
110                 /* struct_v is expected to be >= 1. we only understand
111                  * encoding with struct_compat == 1. */
112                 if (!struct_v || struct_compat != 1)
113                         goto bad;
114                 ceph_decode_32_safe(p, end, struct_len, bad);
115                 ceph_decode_need(p, end, struct_len, bad);
116                 end = *p + struct_len;
117         }
118
119         ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
120         info->in = *p;
121         *p += sizeof(struct ceph_mds_reply_inode) +
122                 sizeof(*info->in->fragtree.splits) *
123                 le32_to_cpu(info->in->fragtree.nsplits);
124
125         ceph_decode_32_safe(p, end, info->symlink_len, bad);
126         ceph_decode_need(p, end, info->symlink_len, bad);
127         info->symlink = *p;
128         *p += info->symlink_len;
129
130         ceph_decode_copy_safe(p, end, &info->dir_layout,
131                               sizeof(info->dir_layout), bad);
132         ceph_decode_32_safe(p, end, info->xattr_len, bad);
133         ceph_decode_need(p, end, info->xattr_len, bad);
134         info->xattr_data = *p;
135         *p += info->xattr_len;
136
137         if (features == (u64)-1) {
138                 /* inline data */
139                 ceph_decode_64_safe(p, end, info->inline_version, bad);
140                 ceph_decode_32_safe(p, end, info->inline_len, bad);
141                 ceph_decode_need(p, end, info->inline_len, bad);
142                 info->inline_data = *p;
143                 *p += info->inline_len;
144                 /* quota */
145                 err = parse_reply_info_quota(p, end, info);
146                 if (err < 0)
147                         goto out_bad;
148                 /* pool namespace */
149                 ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
150                 if (info->pool_ns_len > 0) {
151                         ceph_decode_need(p, end, info->pool_ns_len, bad);
152                         info->pool_ns_data = *p;
153                         *p += info->pool_ns_len;
154                 }
155
156                 /* btime */
157                 ceph_decode_need(p, end, sizeof(info->btime), bad);
158                 ceph_decode_copy(p, &info->btime, sizeof(info->btime));
159
160                 /* change attribute */
161                 ceph_decode_64_safe(p, end, info->change_attr, bad);
162
163                 /* dir pin */
164                 if (struct_v >= 2) {
165                         ceph_decode_32_safe(p, end, info->dir_pin, bad);
166                 } else {
167                         info->dir_pin = -ENODATA;
168                 }
169
170                 /* snapshot birth time, remains zero for v<=2 */
171                 if (struct_v >= 3) {
172                         ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
173                         ceph_decode_copy(p, &info->snap_btime,
174                                          sizeof(info->snap_btime));
175                 } else {
176                         memset(&info->snap_btime, 0, sizeof(info->snap_btime));
177                 }
178
179                 *p = end;
180         } else {
181                 if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
182                         ceph_decode_64_safe(p, end, info->inline_version, bad);
183                         ceph_decode_32_safe(p, end, info->inline_len, bad);
184                         ceph_decode_need(p, end, info->inline_len, bad);
185                         info->inline_data = *p;
186                         *p += info->inline_len;
187                 } else
188                         info->inline_version = CEPH_INLINE_NONE;
189
190                 if (features & CEPH_FEATURE_MDS_QUOTA) {
191                         err = parse_reply_info_quota(p, end, info);
192                         if (err < 0)
193                                 goto out_bad;
194                 } else {
195                         info->max_bytes = 0;
196                         info->max_files = 0;
197                 }
198
199                 info->pool_ns_len = 0;
200                 info->pool_ns_data = NULL;
201                 if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
202                         ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
203                         if (info->pool_ns_len > 0) {
204                                 ceph_decode_need(p, end, info->pool_ns_len, bad);
205                                 info->pool_ns_data = *p;
206                                 *p += info->pool_ns_len;
207                         }
208                 }
209
210                 if (features & CEPH_FEATURE_FS_BTIME) {
211                         ceph_decode_need(p, end, sizeof(info->btime), bad);
212                         ceph_decode_copy(p, &info->btime, sizeof(info->btime));
213                         ceph_decode_64_safe(p, end, info->change_attr, bad);
214                 }
215
216                 info->dir_pin = -ENODATA;
217                 /* info->snap_btime remains zero */
218         }
219         return 0;
220 bad:
221         err = -EIO;
222 out_bad:
223         return err;
224 }
225
226 static int parse_reply_info_dir(void **p, void *end,
227                                 struct ceph_mds_reply_dirfrag **dirfrag,
228                                 u64 features)
229 {
230         if (features == (u64)-1) {
231                 u8 struct_v, struct_compat;
232                 u32 struct_len;
233                 ceph_decode_8_safe(p, end, struct_v, bad);
234                 ceph_decode_8_safe(p, end, struct_compat, bad);
235                 /* struct_v is expected to be >= 1. we only understand
236                  * encoding whose struct_compat == 1. */
237                 if (!struct_v || struct_compat != 1)
238                         goto bad;
239                 ceph_decode_32_safe(p, end, struct_len, bad);
240                 ceph_decode_need(p, end, struct_len, bad);
241                 end = *p + struct_len;
242         }
243
244         ceph_decode_need(p, end, sizeof(**dirfrag), bad);
245         *dirfrag = *p;
246         *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
247         if (unlikely(*p > end))
248                 goto bad;
249         if (features == (u64)-1)
250                 *p = end;
251         return 0;
252 bad:
253         return -EIO;
254 }
255
256 static int parse_reply_info_lease(void **p, void *end,
257                                   struct ceph_mds_reply_lease **lease,
258                                   u64 features)
259 {
260         if (features == (u64)-1) {
261                 u8 struct_v, struct_compat;
262                 u32 struct_len;
263                 ceph_decode_8_safe(p, end, struct_v, bad);
264                 ceph_decode_8_safe(p, end, struct_compat, bad);
265                 /* struct_v is expected to be >= 1. we only understand
266                  * encoding whose struct_compat == 1. */
267                 if (!struct_v || struct_compat != 1)
268                         goto bad;
269                 ceph_decode_32_safe(p, end, struct_len, bad);
270                 ceph_decode_need(p, end, struct_len, bad);
271                 end = *p + struct_len;
272         }
273
274         ceph_decode_need(p, end, sizeof(**lease), bad);
275         *lease = *p;
276         *p += sizeof(**lease);
277         if (features == (u64)-1)
278                 *p = end;
279         return 0;
280 bad:
281         return -EIO;
282 }
283
284 /*
285  * parse a normal reply, which may contain a (dir+)dentry and/or a
286  * target inode.
287  */
288 static int parse_reply_info_trace(void **p, void *end,
289                                   struct ceph_mds_reply_info_parsed *info,
290                                   u64 features)
291 {
292         int err;
293
294         if (info->head->is_dentry) {
295                 err = parse_reply_info_in(p, end, &info->diri, features);
296                 if (err < 0)
297                         goto out_bad;
298
299                 err = parse_reply_info_dir(p, end, &info->dirfrag, features);
300                 if (err < 0)
301                         goto out_bad;
302
303                 ceph_decode_32_safe(p, end, info->dname_len, bad);
304                 ceph_decode_need(p, end, info->dname_len, bad);
305                 info->dname = *p;
306                 *p += info->dname_len;
307
308                 err = parse_reply_info_lease(p, end, &info->dlease, features);
309                 if (err < 0)
310                         goto out_bad;
311         }
312
313         if (info->head->is_target) {
314                 err = parse_reply_info_in(p, end, &info->targeti, features);
315                 if (err < 0)
316                         goto out_bad;
317         }
318
319         if (unlikely(*p != end))
320                 goto bad;
321         return 0;
322
323 bad:
324         err = -EIO;
325 out_bad:
326         pr_err("problem parsing mds trace %d\n", err);
327         return err;
328 }
329
330 /*
331  * parse readdir results
332  */
333 static int parse_reply_info_readdir(void **p, void *end,
334                                 struct ceph_mds_reply_info_parsed *info,
335                                 u64 features)
336 {
337         u32 num, i = 0;
338         int err;
339
340         err = parse_reply_info_dir(p, end, &info->dir_dir, features);
341         if (err < 0)
342                 goto out_bad;
343
344         ceph_decode_need(p, end, sizeof(num) + 2, bad);
345         num = ceph_decode_32(p);
346         {
347                 u16 flags = ceph_decode_16(p);
348                 info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
349                 info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
350                 info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
351                 info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
352         }
353         if (num == 0)
354                 goto done;
355
356         BUG_ON(!info->dir_entries);
357         if ((unsigned long)(info->dir_entries + num) >
358             (unsigned long)info->dir_entries + info->dir_buf_size) {
359                 pr_err("dir contents are larger than expected\n");
360                 WARN_ON(1);
361                 goto bad;
362         }
363
364         info->dir_nr = num;
365         while (num) {
366                 struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
367                 /* dentry */
368                 ceph_decode_32_safe(p, end, rde->name_len, bad);
369                 ceph_decode_need(p, end, rde->name_len, bad);
370                 rde->name = *p;
371                 *p += rde->name_len;
372                 dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name);
373
374                 /* dentry lease */
375                 err = parse_reply_info_lease(p, end, &rde->lease, features);
376                 if (err)
377                         goto out_bad;
378                 /* inode */
379                 err = parse_reply_info_in(p, end, &rde->inode, features);
380                 if (err < 0)
381                         goto out_bad;
382                 /* ceph_readdir_prepopulate() will update it */
383                 rde->offset = 0;
384                 i++;
385                 num--;
386         }
387
388 done:
389         /* Skip over any unrecognized fields */
390         *p = end;
391         return 0;
392
393 bad:
394         err = -EIO;
395 out_bad:
396         pr_err("problem parsing dir contents %d\n", err);
397         return err;
398 }
399
400 /*
401  * parse fcntl F_GETLK results
402  */
403 static int parse_reply_info_filelock(void **p, void *end,
404                                      struct ceph_mds_reply_info_parsed *info,
405                                      u64 features)
406 {
407         if (*p + sizeof(*info->filelock_reply) > end)
408                 goto bad;
409
410         info->filelock_reply = *p;
411
412         /* Skip over any unrecognized fields */
413         *p = end;
414         return 0;
415 bad:
416         return -EIO;
417 }
418
419
420 #if BITS_PER_LONG == 64
421
422 #define DELEGATED_INO_AVAILABLE         xa_mk_value(1)
423
424 static int ceph_parse_deleg_inos(void **p, void *end,
425                                  struct ceph_mds_session *s)
426 {
427         u32 sets;
428
429         ceph_decode_32_safe(p, end, sets, bad);
430         dout("got %u sets of delegated inodes\n", sets);
431         while (sets--) {
432                 u64 start, len, ino;
433
434                 ceph_decode_64_safe(p, end, start, bad);
435                 ceph_decode_64_safe(p, end, len, bad);
436
437                 /* Don't accept a delegation of system inodes */
438                 if (start < CEPH_INO_SYSTEM_BASE) {
439                         pr_warn_ratelimited("ceph: ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n",
440                                         start, len);
441                         continue;
442                 }
443                 while (len--) {
444                         int err = xa_insert(&s->s_delegated_inos, ino = start++,
445                                             DELEGATED_INO_AVAILABLE,
446                                             GFP_KERNEL);
447                         if (!err) {
448                                 dout("added delegated inode 0x%llx\n",
449                                      start - 1);
450                         } else if (err == -EBUSY) {
451                                 pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n",
452                                         start - 1);
453                         } else {
454                                 return err;
455                         }
456                 }
457         }
458         return 0;
459 bad:
460         return -EIO;
461 }
462
463 u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
464 {
465         unsigned long ino;
466         void *val;
467
468         xa_for_each(&s->s_delegated_inos, ino, val) {
469                 val = xa_erase(&s->s_delegated_inos, ino);
470                 if (val == DELEGATED_INO_AVAILABLE)
471                         return ino;
472         }
473         return 0;
474 }
475
476 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
477 {
478         return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
479                          GFP_KERNEL);
480 }
481 #else /* BITS_PER_LONG == 64 */
482 /*
483  * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
484  * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
485  * and bottom words?
486  */
487 static int ceph_parse_deleg_inos(void **p, void *end,
488                                  struct ceph_mds_session *s)
489 {
490         u32 sets;
491
492         ceph_decode_32_safe(p, end, sets, bad);
493         if (sets)
494                 ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
495         return 0;
496 bad:
497         return -EIO;
498 }
499
500 u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
501 {
502         return 0;
503 }
504
505 int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
506 {
507         return 0;
508 }
509 #endif /* BITS_PER_LONG == 64 */
510
511 /*
512  * parse create results
513  */
514 static int parse_reply_info_create(void **p, void *end,
515                                   struct ceph_mds_reply_info_parsed *info,
516                                   u64 features, struct ceph_mds_session *s)
517 {
518         int ret;
519
520         if (features == (u64)-1 ||
521             (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
522                 if (*p == end) {
523                         /* Malformed reply? */
524                         info->has_create_ino = false;
525                 } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
526                         u8 struct_v, struct_compat;
527                         u32 len;
528
529                         info->has_create_ino = true;
530                         ceph_decode_8_safe(p, end, struct_v, bad);
531                         ceph_decode_8_safe(p, end, struct_compat, bad);
532                         ceph_decode_32_safe(p, end, len, bad);
533                         ceph_decode_64_safe(p, end, info->ino, bad);
534                         ret = ceph_parse_deleg_inos(p, end, s);
535                         if (ret)
536                                 return ret;
537                 } else {
538                         /* legacy */
539                         ceph_decode_64_safe(p, end, info->ino, bad);
540                         info->has_create_ino = true;
541                 }
542         } else {
543                 if (*p != end)
544                         goto bad;
545         }
546
547         /* Skip over any unrecognized fields */
548         *p = end;
549         return 0;
550 bad:
551         return -EIO;
552 }
553
554 /*
555  * parse extra results
556  */
557 static int parse_reply_info_extra(void **p, void *end,
558                                   struct ceph_mds_reply_info_parsed *info,
559                                   u64 features, struct ceph_mds_session *s)
560 {
561         u32 op = le32_to_cpu(info->head->op);
562
563         if (op == CEPH_MDS_OP_GETFILELOCK)
564                 return parse_reply_info_filelock(p, end, info, features);
565         else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
566                 return parse_reply_info_readdir(p, end, info, features);
567         else if (op == CEPH_MDS_OP_CREATE)
568                 return parse_reply_info_create(p, end, info, features, s);
569         else
570                 return -EIO;
571 }
572
573 /*
574  * parse entire mds reply
575  */
576 static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
577                             struct ceph_mds_reply_info_parsed *info,
578                             u64 features)
579 {
580         void *p, *end;
581         u32 len;
582         int err;
583
584         info->head = msg->front.iov_base;
585         p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
586         end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
587
588         /* trace */
589         ceph_decode_32_safe(&p, end, len, bad);
590         if (len > 0) {
591                 ceph_decode_need(&p, end, len, bad);
592                 err = parse_reply_info_trace(&p, p+len, info, features);
593                 if (err < 0)
594                         goto out_bad;
595         }
596
597         /* extra */
598         ceph_decode_32_safe(&p, end, len, bad);
599         if (len > 0) {
600                 ceph_decode_need(&p, end, len, bad);
601                 err = parse_reply_info_extra(&p, p+len, info, features, s);
602                 if (err < 0)
603                         goto out_bad;
604         }
605
606         /* snap blob */
607         ceph_decode_32_safe(&p, end, len, bad);
608         info->snapblob_len = len;
609         info->snapblob = p;
610         p += len;
611
612         if (p != end)
613                 goto bad;
614         return 0;
615
616 bad:
617         err = -EIO;
618 out_bad:
619         pr_err("mds parse_reply err %d\n", err);
620         return err;
621 }
622
623 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
624 {
625         if (!info->dir_entries)
626                 return;
627         free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
628 }
629
630
631 /*
632  * sessions
633  */
634 const char *ceph_session_state_name(int s)
635 {
636         switch (s) {
637         case CEPH_MDS_SESSION_NEW: return "new";
638         case CEPH_MDS_SESSION_OPENING: return "opening";
639         case CEPH_MDS_SESSION_OPEN: return "open";
640         case CEPH_MDS_SESSION_HUNG: return "hung";
641         case CEPH_MDS_SESSION_CLOSING: return "closing";
642         case CEPH_MDS_SESSION_CLOSED: return "closed";
643         case CEPH_MDS_SESSION_RESTARTING: return "restarting";
644         case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
645         case CEPH_MDS_SESSION_REJECTED: return "rejected";
646         default: return "???";
647         }
648 }
649
650 struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
651 {
652         if (refcount_inc_not_zero(&s->s_ref)) {
653                 dout("mdsc get_session %p %d -> %d\n", s,
654                      refcount_read(&s->s_ref)-1, refcount_read(&s->s_ref));
655                 return s;
656         } else {
657                 dout("mdsc get_session %p 0 -- FAIL\n", s);
658                 return NULL;
659         }
660 }
661
662 void ceph_put_mds_session(struct ceph_mds_session *s)
663 {
664         if (IS_ERR_OR_NULL(s))
665                 return;
666
667         dout("mdsc put_session %p %d -> %d\n", s,
668              refcount_read(&s->s_ref), refcount_read(&s->s_ref)-1);
669         if (refcount_dec_and_test(&s->s_ref)) {
670                 if (s->s_auth.authorizer)
671                         ceph_auth_destroy_authorizer(s->s_auth.authorizer);
672                 WARN_ON(mutex_is_locked(&s->s_mutex));
673                 xa_destroy(&s->s_delegated_inos);
674                 kfree(s);
675         }
676 }
677
678 /*
679  * called under mdsc->mutex
680  */
681 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
682                                                    int mds)
683 {
684         if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
685                 return NULL;
686         return ceph_get_mds_session(mdsc->sessions[mds]);
687 }
688
689 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
690 {
691         if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
692                 return false;
693         else
694                 return true;
695 }
696
697 static int __verify_registered_session(struct ceph_mds_client *mdsc,
698                                        struct ceph_mds_session *s)
699 {
700         if (s->s_mds >= mdsc->max_sessions ||
701             mdsc->sessions[s->s_mds] != s)
702                 return -ENOENT;
703         return 0;
704 }
705
706 /*
707  * create+register a new session for given mds.
708  * called under mdsc->mutex.
709  */
710 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
711                                                  int mds)
712 {
713         struct ceph_mds_session *s;
714
715         if (mds >= mdsc->mdsmap->possible_max_rank)
716                 return ERR_PTR(-EINVAL);
717
718         s = kzalloc(sizeof(*s), GFP_NOFS);
719         if (!s)
720                 return ERR_PTR(-ENOMEM);
721
722         if (mds >= mdsc->max_sessions) {
723                 int newmax = 1 << get_count_order(mds + 1);
724                 struct ceph_mds_session **sa;
725
726                 dout("%s: realloc to %d\n", __func__, newmax);
727                 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
728                 if (!sa)
729                         goto fail_realloc;
730                 if (mdsc->sessions) {
731                         memcpy(sa, mdsc->sessions,
732                                mdsc->max_sessions * sizeof(void *));
733                         kfree(mdsc->sessions);
734                 }
735                 mdsc->sessions = sa;
736                 mdsc->max_sessions = newmax;
737         }
738
739         dout("%s: mds%d\n", __func__, mds);
740         s->s_mdsc = mdsc;
741         s->s_mds = mds;
742         s->s_state = CEPH_MDS_SESSION_NEW;
743         s->s_ttl = 0;
744         s->s_seq = 0;
745         mutex_init(&s->s_mutex);
746
747         ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
748
749         spin_lock_init(&s->s_gen_ttl_lock);
750         s->s_cap_gen = 1;
751         s->s_cap_ttl = jiffies - 1;
752
753         spin_lock_init(&s->s_cap_lock);
754         s->s_renew_requested = 0;
755         s->s_renew_seq = 0;
756         INIT_LIST_HEAD(&s->s_caps);
757         s->s_nr_caps = 0;
758         refcount_set(&s->s_ref, 1);
759         INIT_LIST_HEAD(&s->s_waiting);
760         INIT_LIST_HEAD(&s->s_unsafe);
761         xa_init(&s->s_delegated_inos);
762         s->s_num_cap_releases = 0;
763         s->s_cap_reconnect = 0;
764         s->s_cap_iterator = NULL;
765         INIT_LIST_HEAD(&s->s_cap_releases);
766         INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
767
768         INIT_LIST_HEAD(&s->s_cap_dirty);
769         INIT_LIST_HEAD(&s->s_cap_flushing);
770
771         mdsc->sessions[mds] = s;
772         atomic_inc(&mdsc->num_sessions);
773         refcount_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
774
775         ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
776                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
777
778         return s;
779
780 fail_realloc:
781         kfree(s);
782         return ERR_PTR(-ENOMEM);
783 }
784
785 /*
786  * called under mdsc->mutex
787  */
788 static void __unregister_session(struct ceph_mds_client *mdsc,
789                                struct ceph_mds_session *s)
790 {
791         dout("__unregister_session mds%d %p\n", s->s_mds, s);
792         BUG_ON(mdsc->sessions[s->s_mds] != s);
793         mdsc->sessions[s->s_mds] = NULL;
794         ceph_con_close(&s->s_con);
795         ceph_put_mds_session(s);
796         atomic_dec(&mdsc->num_sessions);
797 }
798
799 /*
800  * drop session refs in request.
801  *
802  * should be last request ref, or hold mdsc->mutex
803  */
804 static void put_request_session(struct ceph_mds_request *req)
805 {
806         if (req->r_session) {
807                 ceph_put_mds_session(req->r_session);
808                 req->r_session = NULL;
809         }
810 }
811
812 void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc,
813                                 void (*cb)(struct ceph_mds_session *),
814                                 bool check_state)
815 {
816         int mds;
817
818         mutex_lock(&mdsc->mutex);
819         for (mds = 0; mds < mdsc->max_sessions; ++mds) {
820                 struct ceph_mds_session *s;
821
822                 s = __ceph_lookup_mds_session(mdsc, mds);
823                 if (!s)
824                         continue;
825
826                 if (check_state && !check_session_state(s)) {
827                         ceph_put_mds_session(s);
828                         continue;
829                 }
830
831                 mutex_unlock(&mdsc->mutex);
832                 cb(s);
833                 ceph_put_mds_session(s);
834                 mutex_lock(&mdsc->mutex);
835         }
836         mutex_unlock(&mdsc->mutex);
837 }
838
839 void ceph_mdsc_release_request(struct kref *kref)
840 {
841         struct ceph_mds_request *req = container_of(kref,
842                                                     struct ceph_mds_request,
843                                                     r_kref);
844         ceph_mdsc_release_dir_caps_no_check(req);
845         destroy_reply_info(&req->r_reply_info);
846         if (req->r_request)
847                 ceph_msg_put(req->r_request);
848         if (req->r_reply)
849                 ceph_msg_put(req->r_reply);
850         if (req->r_inode) {
851                 ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
852                 /* avoid calling iput_final() in mds dispatch threads */
853                 ceph_async_iput(req->r_inode);
854         }
855         if (req->r_parent) {
856                 ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
857                 ceph_async_iput(req->r_parent);
858         }
859         ceph_async_iput(req->r_target_inode);
860         if (req->r_dentry)
861                 dput(req->r_dentry);
862         if (req->r_old_dentry)
863                 dput(req->r_old_dentry);
864         if (req->r_old_dentry_dir) {
865                 /*
866                  * track (and drop pins for) r_old_dentry_dir
867                  * separately, since r_old_dentry's d_parent may have
868                  * changed between the dir mutex being dropped and
869                  * this request being freed.
870                  */
871                 ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
872                                   CEPH_CAP_PIN);
873                 ceph_async_iput(req->r_old_dentry_dir);
874         }
875         kfree(req->r_path1);
876         kfree(req->r_path2);
877         if (req->r_pagelist)
878                 ceph_pagelist_release(req->r_pagelist);
879         put_request_session(req);
880         ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
881         WARN_ON_ONCE(!list_empty(&req->r_wait));
882         kmem_cache_free(ceph_mds_request_cachep, req);
883 }
884
885 DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)
886
887 /*
888  * lookup session, bump ref if found.
889  *
890  * called under mdsc->mutex.
891  */
892 static struct ceph_mds_request *
893 lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
894 {
895         struct ceph_mds_request *req;
896
897         req = lookup_request(&mdsc->request_tree, tid);
898         if (req)
899                 ceph_mdsc_get_request(req);
900
901         return req;
902 }
903
904 /*
905  * Register an in-flight request, and assign a tid.  Link to directory
906  * are modifying (if any).
907  *
908  * Called under mdsc->mutex.
909  */
910 static void __register_request(struct ceph_mds_client *mdsc,
911                                struct ceph_mds_request *req,
912                                struct inode *dir)
913 {
914         int ret = 0;
915
916         req->r_tid = ++mdsc->last_tid;
917         if (req->r_num_caps) {
918                 ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
919                                         req->r_num_caps);
920                 if (ret < 0) {
921                         pr_err("__register_request %p "
922                                "failed to reserve caps: %d\n", req, ret);
923                         /* set req->r_err to fail early from __do_request */
924                         req->r_err = ret;
925                         return;
926                 }
927         }
928         dout("__register_request %p tid %lld\n", req, req->r_tid);
929         ceph_mdsc_get_request(req);
930         insert_request(&mdsc->request_tree, req);
931
932         req->r_uid = current_fsuid();
933         req->r_gid = current_fsgid();
934
935         if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
936                 mdsc->oldest_tid = req->r_tid;
937
938         if (dir) {
939                 struct ceph_inode_info *ci = ceph_inode(dir);
940
941                 ihold(dir);
942                 req->r_unsafe_dir = dir;
943                 spin_lock(&ci->i_unsafe_lock);
944                 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
945                 spin_unlock(&ci->i_unsafe_lock);
946         }
947 }
948
949 static void __unregister_request(struct ceph_mds_client *mdsc,
950                                  struct ceph_mds_request *req)
951 {
952         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
953
954         /* Never leave an unregistered request on an unsafe list! */
955         list_del_init(&req->r_unsafe_item);
956
957         if (req->r_tid == mdsc->oldest_tid) {
958                 struct rb_node *p = rb_next(&req->r_node);
959                 mdsc->oldest_tid = 0;
960                 while (p) {
961                         struct ceph_mds_request *next_req =
962                                 rb_entry(p, struct ceph_mds_request, r_node);
963                         if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
964                                 mdsc->oldest_tid = next_req->r_tid;
965                                 break;
966                         }
967                         p = rb_next(p);
968                 }
969         }
970
971         erase_request(&mdsc->request_tree, req);
972
973         if (req->r_unsafe_dir) {
974                 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
975                 spin_lock(&ci->i_unsafe_lock);
976                 list_del_init(&req->r_unsafe_dir_item);
977                 spin_unlock(&ci->i_unsafe_lock);
978         }
979         if (req->r_target_inode &&
980             test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
981                 struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
982                 spin_lock(&ci->i_unsafe_lock);
983                 list_del_init(&req->r_unsafe_target_item);
984                 spin_unlock(&ci->i_unsafe_lock);
985         }
986
987         if (req->r_unsafe_dir) {
988                 /* avoid calling iput_final() in mds dispatch threads */
989                 ceph_async_iput(req->r_unsafe_dir);
990                 req->r_unsafe_dir = NULL;
991         }
992
993         complete_all(&req->r_safe_completion);
994
995         ceph_mdsc_put_request(req);
996 }
997
998 /*
999  * Walk back up the dentry tree until we hit a dentry representing a
1000  * non-snapshot inode. We do this using the rcu_read_lock (which must be held
1001  * when calling this) to ensure that the objects won't disappear while we're
1002  * working with them. Once we hit a candidate dentry, we attempt to take a
1003  * reference to it, and return that as the result.
1004  */
1005 static struct inode *get_nonsnap_parent(struct dentry *dentry)
1006 {
1007         struct inode *inode = NULL;
1008
1009         while (dentry && !IS_ROOT(dentry)) {
1010                 inode = d_inode_rcu(dentry);
1011                 if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
1012                         break;
1013                 dentry = dentry->d_parent;
1014         }
1015         if (inode)
1016                 inode = igrab(inode);
1017         return inode;
1018 }
1019
1020 /*
1021  * Choose mds to send request to next.  If there is a hint set in the
1022  * request (e.g., due to a prior forward hint from the mds), use that.
1023  * Otherwise, consult frag tree and/or caps to identify the
1024  * appropriate mds.  If all else fails, choose randomly.
1025  *
1026  * Called under mdsc->mutex.
1027  */
1028 static int __choose_mds(struct ceph_mds_client *mdsc,
1029                         struct ceph_mds_request *req,
1030                         bool *random)
1031 {
1032         struct inode *inode;
1033         struct ceph_inode_info *ci;
1034         struct ceph_cap *cap;
1035         int mode = req->r_direct_mode;
1036         int mds = -1;
1037         u32 hash = req->r_direct_hash;
1038         bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
1039
1040         if (random)
1041                 *random = false;
1042
1043         /*
1044          * is there a specific mds we should try?  ignore hint if we have
1045          * no session and the mds is not up (active or recovering).
1046          */
1047         if (req->r_resend_mds >= 0 &&
1048             (__have_session(mdsc, req->r_resend_mds) ||
1049              ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
1050                 dout("%s using resend_mds mds%d\n", __func__,
1051                      req->r_resend_mds);
1052                 return req->r_resend_mds;
1053         }
1054
1055         if (mode == USE_RANDOM_MDS)
1056                 goto random;
1057
1058         inode = NULL;
1059         if (req->r_inode) {
1060                 if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
1061                         inode = req->r_inode;
1062                         ihold(inode);
1063                 } else {
1064                         /* req->r_dentry is non-null for LSSNAP request */
1065                         rcu_read_lock();
1066                         inode = get_nonsnap_parent(req->r_dentry);
1067                         rcu_read_unlock();
1068                         dout("%s using snapdir's parent %p\n", __func__, inode);
1069                 }
1070         } else if (req->r_dentry) {
1071                 /* ignore race with rename; old or new d_parent is okay */
1072                 struct dentry *parent;
1073                 struct inode *dir;
1074
1075                 rcu_read_lock();
1076                 parent = READ_ONCE(req->r_dentry->d_parent);
1077                 dir = req->r_parent ? : d_inode_rcu(parent);
1078
1079                 if (!dir || dir->i_sb != mdsc->fsc->sb) {
1080                         /*  not this fs or parent went negative */
1081                         inode = d_inode(req->r_dentry);
1082                         if (inode)
1083                                 ihold(inode);
1084                 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
1085                         /* direct snapped/virtual snapdir requests
1086                          * based on parent dir inode */
1087                         inode = get_nonsnap_parent(parent);
1088                         dout("%s using nonsnap parent %p\n", __func__, inode);
1089                 } else {
1090                         /* dentry target */
1091                         inode = d_inode(req->r_dentry);
1092                         if (!inode || mode == USE_AUTH_MDS) {
1093                                 /* dir + name */
1094                                 inode = igrab(dir);
1095                                 hash = ceph_dentry_hash(dir, req->r_dentry);
1096                                 is_hash = true;
1097                         } else {
1098                                 ihold(inode);
1099                         }
1100                 }
1101                 rcu_read_unlock();
1102         }
1103
1104         dout("%s %p is_hash=%d (0x%x) mode %d\n", __func__, inode, (int)is_hash,
1105              hash, mode);
1106         if (!inode)
1107                 goto random;
1108         ci = ceph_inode(inode);
1109
1110         if (is_hash && S_ISDIR(inode->i_mode)) {
1111                 struct ceph_inode_frag frag;
1112                 int found;
1113
1114                 ceph_choose_frag(ci, hash, &frag, &found);
1115                 if (found) {
1116                         if (mode == USE_ANY_MDS && frag.ndist > 0) {
1117                                 u8 r;
1118
1119                                 /* choose a random replica */
1120                                 get_random_bytes(&r, 1);
1121                                 r %= frag.ndist;
1122                                 mds = frag.dist[r];
1123                                 dout("%s %p %llx.%llx frag %u mds%d (%d/%d)\n",
1124                                      __func__, inode, ceph_vinop(inode),
1125                                      frag.frag, mds, (int)r, frag.ndist);
1126                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1127                                     CEPH_MDS_STATE_ACTIVE &&
1128                                     !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
1129                                         goto out;
1130                         }
1131
1132                         /* since this file/dir wasn't known to be
1133                          * replicated, then we want to look for the
1134                          * authoritative mds. */
1135                         if (frag.mds >= 0) {
1136                                 /* choose auth mds */
1137                                 mds = frag.mds;
1138                                 dout("%s %p %llx.%llx frag %u mds%d (auth)\n",
1139                                      __func__, inode, ceph_vinop(inode),
1140                                      frag.frag, mds);
1141                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
1142                                     CEPH_MDS_STATE_ACTIVE) {
1143                                         if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
1144                                                                   mds))
1145                                                 goto out;
1146                                 }
1147                         }
1148                         mode = USE_AUTH_MDS;
1149                 }
1150         }
1151
1152         spin_lock(&ci->i_ceph_lock);
1153         cap = NULL;
1154         if (mode == USE_AUTH_MDS)
1155                 cap = ci->i_auth_cap;
1156         if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
1157                 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
1158         if (!cap) {
1159                 spin_unlock(&ci->i_ceph_lock);
1160                 ceph_async_iput(inode);
1161                 goto random;
1162         }
1163         mds = cap->session->s_mds;
1164         dout("%s %p %llx.%llx mds%d (%scap %p)\n", __func__,
1165              inode, ceph_vinop(inode), mds,
1166              cap == ci->i_auth_cap ? "auth " : "", cap);
1167         spin_unlock(&ci->i_ceph_lock);
1168 out:
1169         /* avoid calling iput_final() while holding mdsc->mutex or
1170          * in mds dispatch threads */
1171         ceph_async_iput(inode);
1172         return mds;
1173
1174 random:
1175         if (random)
1176                 *random = true;
1177
1178         mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
1179         dout("%s chose random mds%d\n", __func__, mds);
1180         return mds;
1181 }
1182
1183
1184 /*
1185  * session messages
1186  */
1187 struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq)
1188 {
1189         struct ceph_msg *msg;
1190         struct ceph_mds_session_head *h;
1191
1192         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
1193                            false);
1194         if (!msg) {
1195                 pr_err("ENOMEM creating session %s msg\n",
1196                        ceph_session_op_name(op));
1197                 return NULL;
1198         }
1199         h = msg->front.iov_base;
1200         h->op = cpu_to_le32(op);
1201         h->seq = cpu_to_le64(seq);
1202
1203         return msg;
1204 }
1205
1206 static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
1207 #define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
1208 static int encode_supported_features(void **p, void *end)
1209 {
1210         static const size_t count = ARRAY_SIZE(feature_bits);
1211
1212         if (count > 0) {
1213                 size_t i;
1214                 size_t size = FEATURE_BYTES(count);
1215                 unsigned long bit;
1216
1217                 if (WARN_ON_ONCE(*p + 4 + size > end))
1218                         return -ERANGE;
1219
1220                 ceph_encode_32(p, size);
1221                 memset(*p, 0, size);
1222                 for (i = 0; i < count; i++) {
1223                         bit = feature_bits[i];
1224                         ((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8);
1225                 }
1226                 *p += size;
1227         } else {
1228                 if (WARN_ON_ONCE(*p + 4 > end))
1229                         return -ERANGE;
1230
1231                 ceph_encode_32(p, 0);
1232         }
1233
1234         return 0;
1235 }
1236
1237 static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
1238 #define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
1239 static int encode_metric_spec(void **p, void *end)
1240 {
1241         static const size_t count = ARRAY_SIZE(metric_bits);
1242
1243         /* header */
1244         if (WARN_ON_ONCE(*p + 2 > end))
1245                 return -ERANGE;
1246
1247         ceph_encode_8(p, 1); /* version */
1248         ceph_encode_8(p, 1); /* compat */
1249
1250         if (count > 0) {
1251                 size_t i;
1252                 size_t size = METRIC_BYTES(count);
1253
1254                 if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
1255                         return -ERANGE;
1256
1257                 /* metric spec info length */
1258                 ceph_encode_32(p, 4 + size);
1259
1260                 /* metric spec */
1261                 ceph_encode_32(p, size);
1262                 memset(*p, 0, size);
1263                 for (i = 0; i < count; i++)
1264                         ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
1265                 *p += size;
1266         } else {
1267                 if (WARN_ON_ONCE(*p + 4 + 4 > end))
1268                         return -ERANGE;
1269
1270                 /* metric spec info length */
1271                 ceph_encode_32(p, 4);
1272                 /* metric spec */
1273                 ceph_encode_32(p, 0);
1274         }
1275
1276         return 0;
1277 }
1278
1279 /*
1280  * session message, specialization for CEPH_SESSION_REQUEST_OPEN
1281  * to include additional client metadata fields.
1282  */
1283 static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u64 seq)
1284 {
1285         struct ceph_msg *msg;
1286         struct ceph_mds_session_head *h;
1287         int i = -1;
1288         int extra_bytes = 0;
1289         int metadata_key_count = 0;
1290         struct ceph_options *opt = mdsc->fsc->client->options;
1291         struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
1292         size_t size, count;
1293         void *p, *end;
1294         int ret;
1295
1296         const char* metadata[][2] = {
1297                 {"hostname", mdsc->nodename},
1298                 {"kernel_version", init_utsname()->release},
1299                 {"entity_id", opt->name ? : ""},
1300                 {"root", fsopt->server_path ? : "/"},
1301                 {NULL, NULL}
1302         };
1303
1304         /* Calculate serialized length of metadata */
1305         extra_bytes = 4;  /* map length */
1306         for (i = 0; metadata[i][0]; ++i) {
1307                 extra_bytes += 8 + strlen(metadata[i][0]) +
1308                         strlen(metadata[i][1]);
1309                 metadata_key_count++;
1310         }
1311
1312         /* supported feature */
1313         size = 0;
1314         count = ARRAY_SIZE(feature_bits);
1315         if (count > 0)
1316                 size = FEATURE_BYTES(count);
1317         extra_bytes += 4 + size;
1318
1319         /* metric spec */
1320         size = 0;
1321         count = ARRAY_SIZE(metric_bits);
1322         if (count > 0)
1323                 size = METRIC_BYTES(count);
1324         extra_bytes += 2 + 4 + 4 + size;
1325
1326         /* Allocate the message */
1327         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
1328                            GFP_NOFS, false);
1329         if (!msg) {
1330                 pr_err("ENOMEM creating session open msg\n");
1331                 return ERR_PTR(-ENOMEM);
1332         }
1333         p = msg->front.iov_base;
1334         end = p + msg->front.iov_len;
1335
1336         h = p;
1337         h->op = cpu_to_le32(CEPH_SESSION_REQUEST_OPEN);
1338         h->seq = cpu_to_le64(seq);
1339
1340         /*
1341          * Serialize client metadata into waiting buffer space, using
1342          * the format that userspace expects for map<string, string>
1343          *
1344          * ClientSession messages with metadata are v4
1345          */
1346         msg->hdr.version = cpu_to_le16(4);
1347         msg->hdr.compat_version = cpu_to_le16(1);
1348
1349         /* The write pointer, following the session_head structure */
1350         p += sizeof(*h);
1351
1352         /* Number of entries in the map */
1353         ceph_encode_32(&p, metadata_key_count);
1354
1355         /* Two length-prefixed strings for each entry in the map */
1356         for (i = 0; metadata[i][0]; ++i) {
1357                 size_t const key_len = strlen(metadata[i][0]);
1358                 size_t const val_len = strlen(metadata[i][1]);
1359
1360                 ceph_encode_32(&p, key_len);
1361                 memcpy(p, metadata[i][0], key_len);
1362                 p += key_len;
1363                 ceph_encode_32(&p, val_len);
1364                 memcpy(p, metadata[i][1], val_len);
1365                 p += val_len;
1366         }
1367
1368         ret = encode_supported_features(&p, end);
1369         if (ret) {
1370                 pr_err("encode_supported_features failed!\n");
1371                 ceph_msg_put(msg);
1372                 return ERR_PTR(ret);
1373         }
1374
1375         ret = encode_metric_spec(&p, end);
1376         if (ret) {
1377                 pr_err("encode_metric_spec failed!\n");
1378                 ceph_msg_put(msg);
1379                 return ERR_PTR(ret);
1380         }
1381
1382         msg->front.iov_len = p - msg->front.iov_base;
1383         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1384
1385         return msg;
1386 }
1387
1388 /*
1389  * send session open request.
1390  *
1391  * called under mdsc->mutex
1392  */
1393 static int __open_session(struct ceph_mds_client *mdsc,
1394                           struct ceph_mds_session *session)
1395 {
1396         struct ceph_msg *msg;
1397         int mstate;
1398         int mds = session->s_mds;
1399
1400         /* wait for mds to go active? */
1401         mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
1402         dout("open_session to mds%d (%s)\n", mds,
1403              ceph_mds_state_name(mstate));
1404         session->s_state = CEPH_MDS_SESSION_OPENING;
1405         session->s_renew_requested = jiffies;
1406
1407         /* send connect message */
1408         msg = create_session_open_msg(mdsc, session->s_seq);
1409         if (IS_ERR(msg))
1410                 return PTR_ERR(msg);
1411         ceph_con_send(&session->s_con, msg);
1412         return 0;
1413 }
1414
1415 /*
1416  * open sessions for any export targets for the given mds
1417  *
1418  * called under mdsc->mutex
1419  */
1420 static struct ceph_mds_session *
1421 __open_export_target_session(struct ceph_mds_client *mdsc, int target)
1422 {
1423         struct ceph_mds_session *session;
1424         int ret;
1425
1426         session = __ceph_lookup_mds_session(mdsc, target);
1427         if (!session) {
1428                 session = register_session(mdsc, target);
1429                 if (IS_ERR(session))
1430                         return session;
1431         }
1432         if (session->s_state == CEPH_MDS_SESSION_NEW ||
1433             session->s_state == CEPH_MDS_SESSION_CLOSING) {
1434                 ret = __open_session(mdsc, session);
1435                 if (ret)
1436                         return ERR_PTR(ret);
1437         }
1438
1439         return session;
1440 }
1441
1442 struct ceph_mds_session *
1443 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
1444 {
1445         struct ceph_mds_session *session;
1446
1447         dout("open_export_target_session to mds%d\n", target);
1448
1449         mutex_lock(&mdsc->mutex);
1450         session = __open_export_target_session(mdsc, target);
1451         mutex_unlock(&mdsc->mutex);
1452
1453         return session;
1454 }
1455
1456 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
1457                                           struct ceph_mds_session *session)
1458 {
1459         struct ceph_mds_info *mi;
1460         struct ceph_mds_session *ts;
1461         int i, mds = session->s_mds;
1462
1463         if (mds >= mdsc->mdsmap->possible_max_rank)
1464                 return;
1465
1466         mi = &mdsc->mdsmap->m_info[mds];
1467         dout("open_export_target_sessions for mds%d (%d targets)\n",
1468              session->s_mds, mi->num_export_targets);
1469
1470         for (i = 0; i < mi->num_export_targets; i++) {
1471                 ts = __open_export_target_session(mdsc, mi->export_targets[i]);
1472                 ceph_put_mds_session(ts);
1473         }
1474 }
1475
1476 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
1477                                            struct ceph_mds_session *session)
1478 {
1479         mutex_lock(&mdsc->mutex);
1480         __open_export_target_sessions(mdsc, session);
1481         mutex_unlock(&mdsc->mutex);
1482 }
1483
1484 /*
1485  * session caps
1486  */
1487
1488 static void detach_cap_releases(struct ceph_mds_session *session,
1489                                 struct list_head *target)
1490 {
1491         lockdep_assert_held(&session->s_cap_lock);
1492
1493         list_splice_init(&session->s_cap_releases, target);
1494         session->s_num_cap_releases = 0;
1495         dout("dispose_cap_releases mds%d\n", session->s_mds);
1496 }
1497
1498 static void dispose_cap_releases(struct ceph_mds_client *mdsc,
1499                                  struct list_head *dispose)
1500 {
1501         while (!list_empty(dispose)) {
1502                 struct ceph_cap *cap;
1503                 /* zero out the in-progress message */
1504                 cap = list_first_entry(dispose, struct ceph_cap, session_caps);
1505                 list_del(&cap->session_caps);
1506                 ceph_put_cap(mdsc, cap);
1507         }
1508 }
1509
1510 static void cleanup_session_requests(struct ceph_mds_client *mdsc,
1511                                      struct ceph_mds_session *session)
1512 {
1513         struct ceph_mds_request *req;
1514         struct rb_node *p;
1515
1516         dout("cleanup_session_requests mds%d\n", session->s_mds);
1517         mutex_lock(&mdsc->mutex);
1518         while (!list_empty(&session->s_unsafe)) {
1519                 req = list_first_entry(&session->s_unsafe,
1520                                        struct ceph_mds_request, r_unsafe_item);
1521                 pr_warn_ratelimited(" dropping unsafe request %llu\n",
1522                                     req->r_tid);
1523                 if (req->r_target_inode)
1524                         mapping_set_error(req->r_target_inode->i_mapping, -EIO);
1525                 if (req->r_unsafe_dir)
1526                         mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO);
1527                 __unregister_request(mdsc, req);
1528         }
1529         /* zero r_attempts, so kick_requests() will re-send requests */
1530         p = rb_first(&mdsc->request_tree);
1531         while (p) {
1532                 req = rb_entry(p, struct ceph_mds_request, r_node);
1533                 p = rb_next(p);
1534                 if (req->r_session &&
1535                     req->r_session->s_mds == session->s_mds)
1536                         req->r_attempts = 0;
1537         }
1538         mutex_unlock(&mdsc->mutex);
1539 }
1540
1541 /*
1542  * Helper to safely iterate over all caps associated with a session, with
1543  * special care taken to handle a racing __ceph_remove_cap().
1544  *
1545  * Caller must hold session s_mutex.
1546  */
1547 int ceph_iterate_session_caps(struct ceph_mds_session *session,
1548                               int (*cb)(struct inode *, struct ceph_cap *,
1549                                         void *), void *arg)
1550 {
1551         struct list_head *p;
1552         struct ceph_cap *cap;
1553         struct inode *inode, *last_inode = NULL;
1554         struct ceph_cap *old_cap = NULL;
1555         int ret;
1556
1557         dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
1558         spin_lock(&session->s_cap_lock);
1559         p = session->s_caps.next;
1560         while (p != &session->s_caps) {
1561                 cap = list_entry(p, struct ceph_cap, session_caps);
1562                 inode = igrab(&cap->ci->vfs_inode);
1563                 if (!inode) {
1564                         p = p->next;
1565                         continue;
1566                 }
1567                 session->s_cap_iterator = cap;
1568                 spin_unlock(&session->s_cap_lock);
1569
1570                 if (last_inode) {
1571                         /* avoid calling iput_final() while holding
1572                          * s_mutex or in mds dispatch threads */
1573                         ceph_async_iput(last_inode);
1574                         last_inode = NULL;
1575                 }
1576                 if (old_cap) {
1577                         ceph_put_cap(session->s_mdsc, old_cap);
1578                         old_cap = NULL;
1579                 }
1580
1581                 ret = cb(inode, cap, arg);
1582                 last_inode = inode;
1583
1584                 spin_lock(&session->s_cap_lock);
1585                 p = p->next;
1586                 if (!cap->ci) {
1587                         dout("iterate_session_caps  finishing cap %p removal\n",
1588                              cap);
1589                         BUG_ON(cap->session != session);
1590                         cap->session = NULL;
1591                         list_del_init(&cap->session_caps);
1592                         session->s_nr_caps--;
1593                         atomic64_dec(&session->s_mdsc->metric.total_caps);
1594                         if (cap->queue_release)
1595                                 __ceph_queue_cap_release(session, cap);
1596                         else
1597                                 old_cap = cap;  /* put_cap it w/o locks held */
1598                 }
1599                 if (ret < 0)
1600                         goto out;
1601         }
1602         ret = 0;
1603 out:
1604         session->s_cap_iterator = NULL;
1605         spin_unlock(&session->s_cap_lock);
1606
1607         ceph_async_iput(last_inode);
1608         if (old_cap)
1609                 ceph_put_cap(session->s_mdsc, old_cap);
1610
1611         return ret;
1612 }
1613
1614 static int remove_capsnaps(struct ceph_mds_client *mdsc, struct inode *inode)
1615 {
1616         struct ceph_inode_info *ci = ceph_inode(inode);
1617         struct ceph_cap_snap *capsnap;
1618         int capsnap_release = 0;
1619
1620         lockdep_assert_held(&ci->i_ceph_lock);
1621
1622         dout("removing capsnaps, ci is %p, inode is %p\n", ci, inode);
1623
1624         while (!list_empty(&ci->i_cap_snaps)) {
1625                 capsnap = list_first_entry(&ci->i_cap_snaps,
1626                                            struct ceph_cap_snap, ci_item);
1627                 __ceph_remove_capsnap(inode, capsnap, NULL, NULL);
1628                 ceph_put_snap_context(capsnap->context);
1629                 ceph_put_cap_snap(capsnap);
1630                 capsnap_release++;
1631         }
1632         wake_up_all(&ci->i_cap_wq);
1633         wake_up_all(&mdsc->cap_flushing_wq);
1634         return capsnap_release;
1635 }
1636
1637 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
1638                                   void *arg)
1639 {
1640         struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;
1641         struct ceph_mds_client *mdsc = fsc->mdsc;
1642         struct ceph_inode_info *ci = ceph_inode(inode);
1643         LIST_HEAD(to_remove);
1644         bool dirty_dropped = false;
1645         bool invalidate = false;
1646         int capsnap_release = 0;
1647
1648         dout("removing cap %p, ci is %p, inode is %p\n",
1649              cap, ci, &ci->vfs_inode);
1650         spin_lock(&ci->i_ceph_lock);
1651         __ceph_remove_cap(cap, false);
1652         if (!ci->i_auth_cap) {
1653                 struct ceph_cap_flush *cf;
1654
1655                 if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
1656                         if (inode->i_data.nrpages > 0)
1657                                 invalidate = true;
1658                         if (ci->i_wrbuffer_ref > 0)
1659                                 mapping_set_error(&inode->i_data, -EIO);
1660                 }
1661
1662                 while (!list_empty(&ci->i_cap_flush_list)) {
1663                         cf = list_first_entry(&ci->i_cap_flush_list,
1664                                               struct ceph_cap_flush, i_list);
1665                         list_move(&cf->i_list, &to_remove);
1666                 }
1667
1668                 spin_lock(&mdsc->cap_dirty_lock);
1669
1670                 list_for_each_entry(cf, &to_remove, i_list)
1671                         list_del_init(&cf->g_list);
1672
1673                 if (!list_empty(&ci->i_dirty_item)) {
1674                         pr_warn_ratelimited(
1675                                 " dropping dirty %s state for %p %lld\n",
1676                                 ceph_cap_string(ci->i_dirty_caps),
1677                                 inode, ceph_ino(inode));
1678                         ci->i_dirty_caps = 0;
1679                         list_del_init(&ci->i_dirty_item);
1680                         dirty_dropped = true;
1681                 }
1682                 if (!list_empty(&ci->i_flushing_item)) {
1683                         pr_warn_ratelimited(
1684                                 " dropping dirty+flushing %s state for %p %lld\n",
1685                                 ceph_cap_string(ci->i_flushing_caps),
1686                                 inode, ceph_ino(inode));
1687                         ci->i_flushing_caps = 0;
1688                         list_del_init(&ci->i_flushing_item);
1689                         mdsc->num_cap_flushing--;
1690                         dirty_dropped = true;
1691                 }
1692                 spin_unlock(&mdsc->cap_dirty_lock);
1693
1694                 if (dirty_dropped) {
1695                         mapping_set_error(inode->i_mapping, -EIO);
1696
1697                         if (ci->i_wrbuffer_ref_head == 0 &&
1698                             ci->i_wr_ref == 0 &&
1699                             ci->i_dirty_caps == 0 &&
1700                             ci->i_flushing_caps == 0) {
1701                                 ceph_put_snap_context(ci->i_head_snapc);
1702                                 ci->i_head_snapc = NULL;
1703                         }
1704                 }
1705
1706                 if (atomic_read(&ci->i_filelock_ref) > 0) {
1707                         /* make further file lock syscall return -EIO */
1708                         ci->i_ceph_flags |= CEPH_I_ERROR_FILELOCK;
1709                         pr_warn_ratelimited(" dropping file locks for %p %lld\n",
1710                                             inode, ceph_ino(inode));
1711                 }
1712
1713                 if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
1714                         list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
1715                         ci->i_prealloc_cap_flush = NULL;
1716                 }
1717
1718                 if (!list_empty(&ci->i_cap_snaps))
1719                         capsnap_release = remove_capsnaps(mdsc, inode);
1720         }
1721         spin_unlock(&ci->i_ceph_lock);
1722         while (!list_empty(&to_remove)) {
1723                 struct ceph_cap_flush *cf;
1724                 cf = list_first_entry(&to_remove,
1725                                       struct ceph_cap_flush, i_list);
1726                 list_del_init(&cf->i_list);
1727                 if (!cf->is_capsnap)
1728                         ceph_free_cap_flush(cf);
1729         }
1730
1731         wake_up_all(&ci->i_cap_wq);
1732         if (invalidate)
1733                 ceph_queue_invalidate(inode);
1734         if (dirty_dropped)
1735                 iput(inode);
1736         while (capsnap_release--)
1737                 iput(inode);
1738         return 0;
1739 }
1740
1741 /*
1742  * caller must hold session s_mutex
1743  */
1744 static void remove_session_caps(struct ceph_mds_session *session)
1745 {
1746         struct ceph_fs_client *fsc = session->s_mdsc->fsc;
1747         struct super_block *sb = fsc->sb;
1748         LIST_HEAD(dispose);
1749
1750         dout("remove_session_caps on %p\n", session);
1751         ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);
1752
1753         wake_up_all(&fsc->mdsc->cap_flushing_wq);
1754
1755         spin_lock(&session->s_cap_lock);
1756         if (session->s_nr_caps > 0) {
1757                 struct inode *inode;
1758                 struct ceph_cap *cap, *prev = NULL;
1759                 struct ceph_vino vino;
1760                 /*
1761                  * iterate_session_caps() skips inodes that are being
1762                  * deleted, we need to wait until deletions are complete.
1763                  * __wait_on_freeing_inode() is designed for the job,
1764                  * but it is not exported, so use lookup inode function
1765                  * to access it.
1766                  */
1767                 while (!list_empty(&session->s_caps)) {
1768                         cap = list_entry(session->s_caps.next,
1769                                          struct ceph_cap, session_caps);
1770                         if (cap == prev)
1771                                 break;
1772                         prev = cap;
1773                         vino = cap->ci->i_vino;
1774                         spin_unlock(&session->s_cap_lock);
1775
1776                         inode = ceph_find_inode(sb, vino);
1777                          /* avoid calling iput_final() while holding s_mutex */
1778                         ceph_async_iput(inode);
1779
1780                         spin_lock(&session->s_cap_lock);
1781                 }
1782         }
1783
1784         // drop cap expires and unlock s_cap_lock
1785         detach_cap_releases(session, &dispose);
1786
1787         BUG_ON(session->s_nr_caps > 0);
1788         BUG_ON(!list_empty(&session->s_cap_flushing));
1789         spin_unlock(&session->s_cap_lock);
1790         dispose_cap_releases(session->s_mdsc, &dispose);
1791 }
1792
1793 enum {
1794         RECONNECT,
1795         RENEWCAPS,
1796         FORCE_RO,
1797 };
1798
1799 /*
1800  * wake up any threads waiting on this session's caps.  if the cap is
1801  * old (didn't get renewed on the client reconnect), remove it now.
1802  *
1803  * caller must hold s_mutex.
1804  */
1805 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
1806                               void *arg)
1807 {
1808         struct ceph_inode_info *ci = ceph_inode(inode);
1809         unsigned long ev = (unsigned long)arg;
1810
1811         if (ev == RECONNECT) {
1812                 spin_lock(&ci->i_ceph_lock);
1813                 ci->i_wanted_max_size = 0;
1814                 ci->i_requested_max_size = 0;
1815                 spin_unlock(&ci->i_ceph_lock);
1816         } else if (ev == RENEWCAPS) {
1817                 if (cap->cap_gen < cap->session->s_cap_gen) {
1818                         /* mds did not re-issue stale cap */
1819                         spin_lock(&ci->i_ceph_lock);
1820                         cap->issued = cap->implemented = CEPH_CAP_PIN;
1821                         spin_unlock(&ci->i_ceph_lock);
1822                 }
1823         } else if (ev == FORCE_RO) {
1824         }
1825         wake_up_all(&ci->i_cap_wq);
1826         return 0;
1827 }
1828
1829 static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
1830 {
1831         dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1832         ceph_iterate_session_caps(session, wake_up_session_cb,
1833                                   (void *)(unsigned long)ev);
1834 }
1835
1836 /*
1837  * Send periodic message to MDS renewing all currently held caps.  The
1838  * ack will reset the expiration for all caps from this session.
1839  *
1840  * caller holds s_mutex
1841  */
1842 static int send_renew_caps(struct ceph_mds_client *mdsc,
1843                            struct ceph_mds_session *session)
1844 {
1845         struct ceph_msg *msg;
1846         int state;
1847
1848         if (time_after_eq(jiffies, session->s_cap_ttl) &&
1849             time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1850                 pr_info("mds%d caps stale\n", session->s_mds);
1851         session->s_renew_requested = jiffies;
1852
1853         /* do not try to renew caps until a recovering mds has reconnected
1854          * with its clients. */
1855         state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1856         if (state < CEPH_MDS_STATE_RECONNECT) {
1857                 dout("send_renew_caps ignoring mds%d (%s)\n",
1858                      session->s_mds, ceph_mds_state_name(state));
1859                 return 0;
1860         }
1861
1862         dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1863                 ceph_mds_state_name(state));
1864         msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1865                                       ++session->s_renew_seq);
1866         if (!msg)
1867                 return -ENOMEM;
1868         ceph_con_send(&session->s_con, msg);
1869         return 0;
1870 }
1871
1872 static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
1873                              struct ceph_mds_session *session, u64 seq)
1874 {
1875         struct ceph_msg *msg;
1876
1877         dout("send_flushmsg_ack to mds%d (%s)s seq %lld\n",
1878              session->s_mds, ceph_session_state_name(session->s_state), seq);
1879         msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
1880         if (!msg)
1881                 return -ENOMEM;
1882         ceph_con_send(&session->s_con, msg);
1883         return 0;
1884 }
1885
1886
1887 /*
1888  * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1889  *
1890  * Called under session->s_mutex
1891  */
1892 static void renewed_caps(struct ceph_mds_client *mdsc,
1893                          struct ceph_mds_session *session, int is_renew)
1894 {
1895         int was_stale;
1896         int wake = 0;
1897
1898         spin_lock(&session->s_cap_lock);
1899         was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);
1900
1901         session->s_cap_ttl = session->s_renew_requested +
1902                 mdsc->mdsmap->m_session_timeout*HZ;
1903
1904         if (was_stale) {
1905                 if (time_before(jiffies, session->s_cap_ttl)) {
1906                         pr_info("mds%d caps renewed\n", session->s_mds);
1907                         wake = 1;
1908                 } else {
1909                         pr_info("mds%d caps still stale\n", session->s_mds);
1910                 }
1911         }
1912         dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1913              session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1914              time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1915         spin_unlock(&session->s_cap_lock);
1916
1917         if (wake)
1918                 wake_up_session_caps(session, RENEWCAPS);
1919 }
1920
1921 /*
1922  * send a session close request
1923  */
1924 static int request_close_session(struct ceph_mds_session *session)
1925 {
1926         struct ceph_msg *msg;
1927
1928         dout("request_close_session mds%d state %s seq %lld\n",
1929              session->s_mds, ceph_session_state_name(session->s_state),
1930              session->s_seq);
1931         msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
1932                                       session->s_seq);
1933         if (!msg)
1934                 return -ENOMEM;
1935         ceph_con_send(&session->s_con, msg);
1936         return 1;
1937 }
1938
1939 /*
1940  * Called with s_mutex held.
1941  */
1942 static int __close_session(struct ceph_mds_client *mdsc,
1943                          struct ceph_mds_session *session)
1944 {
1945         if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1946                 return 0;
1947         session->s_state = CEPH_MDS_SESSION_CLOSING;
1948         return request_close_session(session);
1949 }
1950
1951 static bool drop_negative_children(struct dentry *dentry)
1952 {
1953         struct dentry *child;
1954         bool all_negative = true;
1955
1956         if (!d_is_dir(dentry))
1957                 goto out;
1958
1959         spin_lock(&dentry->d_lock);
1960         list_for_each_entry(child, &dentry->d_subdirs, d_child) {
1961                 if (d_really_is_positive(child)) {
1962                         all_negative = false;
1963                         break;
1964                 }
1965         }
1966         spin_unlock(&dentry->d_lock);
1967
1968         if (all_negative)
1969                 shrink_dcache_parent(dentry);
1970 out:
1971         return all_negative;
1972 }
1973
1974 /*
1975  * Trim old(er) caps.
1976  *
1977  * Because we can't cache an inode without one or more caps, we do
1978  * this indirectly: if a cap is unused, we prune its aliases, at which
1979  * point the inode will hopefully get dropped to.
1980  *
1981  * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1982  * memory pressure from the MDS, though, so it needn't be perfect.
1983  */
1984 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1985 {
1986         int *remaining = arg;
1987         struct ceph_inode_info *ci = ceph_inode(inode);
1988         int used, wanted, oissued, mine;
1989
1990         if (*remaining <= 0)
1991                 return -1;
1992
1993         spin_lock(&ci->i_ceph_lock);
1994         mine = cap->issued | cap->implemented;
1995         used = __ceph_caps_used(ci);
1996         wanted = __ceph_caps_file_wanted(ci);
1997         oissued = __ceph_caps_issued_other(ci, cap);
1998
1999         dout("trim_caps_cb %p cap %p mine %s oissued %s used %s wanted %s\n",
2000              inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
2001              ceph_cap_string(used), ceph_cap_string(wanted));
2002         if (cap == ci->i_auth_cap) {
2003                 if (ci->i_dirty_caps || ci->i_flushing_caps ||
2004                     !list_empty(&ci->i_cap_snaps))
2005                         goto out;
2006                 if ((used | wanted) & CEPH_CAP_ANY_WR)
2007                         goto out;
2008                 /* Note: it's possible that i_filelock_ref becomes non-zero
2009                  * after dropping auth caps. It doesn't hurt because reply
2010                  * of lock mds request will re-add auth caps. */
2011                 if (atomic_read(&ci->i_filelock_ref) > 0)
2012                         goto out;
2013         }
2014         /* The inode has cached pages, but it's no longer used.
2015          * we can safely drop it */
2016         if (S_ISREG(inode->i_mode) &&
2017             wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
2018             !(oissued & CEPH_CAP_FILE_CACHE)) {
2019           used = 0;
2020           oissued = 0;
2021         }
2022         if ((used | wanted) & ~oissued & mine)
2023                 goto out;   /* we need these caps */
2024
2025         if (oissued) {
2026                 /* we aren't the only cap.. just remove us */
2027                 __ceph_remove_cap(cap, true);
2028                 (*remaining)--;
2029         } else {
2030                 struct dentry *dentry;
2031                 /* try dropping referring dentries */
2032                 spin_unlock(&ci->i_ceph_lock);
2033                 dentry = d_find_any_alias(inode);
2034                 if (dentry && drop_negative_children(dentry)) {
2035                         int count;
2036                         dput(dentry);
2037                         d_prune_aliases(inode);
2038                         count = atomic_read(&inode->i_count);
2039                         if (count == 1)
2040                                 (*remaining)--;
2041                         dout("trim_caps_cb %p cap %p pruned, count now %d\n",
2042                              inode, cap, count);
2043                 } else {
2044                         dput(dentry);
2045                 }
2046                 return 0;
2047         }
2048
2049 out:
2050         spin_unlock(&ci->i_ceph_lock);
2051         return 0;
2052 }
2053
2054 /*
2055  * Trim session cap count down to some max number.
2056  */
2057 int ceph_trim_caps(struct ceph_mds_client *mdsc,
2058                    struct ceph_mds_session *session,
2059                    int max_caps)
2060 {
2061         int trim_caps = session->s_nr_caps - max_caps;
2062
2063         dout("trim_caps mds%d start: %d / %d, trim %d\n",
2064              session->s_mds, session->s_nr_caps, max_caps, trim_caps);
2065         if (trim_caps > 0) {
2066                 int remaining = trim_caps;
2067
2068                 ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
2069                 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
2070                      session->s_mds, session->s_nr_caps, max_caps,
2071                         trim_caps - remaining);
2072         }
2073
2074         ceph_flush_cap_releases(mdsc, session);
2075         return 0;
2076 }
2077
2078 static int check_caps_flush(struct ceph_mds_client *mdsc,
2079                             u64 want_flush_tid)
2080 {
2081         int ret = 1;
2082
2083         spin_lock(&mdsc->cap_dirty_lock);
2084         if (!list_empty(&mdsc->cap_flush_list)) {
2085                 struct ceph_cap_flush *cf =
2086                         list_first_entry(&mdsc->cap_flush_list,
2087                                          struct ceph_cap_flush, g_list);
2088                 if (cf->tid <= want_flush_tid) {
2089                         dout("check_caps_flush still flushing tid "
2090                              "%llu <= %llu\n", cf->tid, want_flush_tid);
2091                         ret = 0;
2092                 }
2093         }
2094         spin_unlock(&mdsc->cap_dirty_lock);
2095         return ret;
2096 }
2097
2098 /*
2099  * flush all dirty inode data to disk.
2100  *
2101  * returns true if we've flushed through want_flush_tid
2102  */
2103 static void wait_caps_flush(struct ceph_mds_client *mdsc,
2104                             u64 want_flush_tid)
2105 {
2106         dout("check_caps_flush want %llu\n", want_flush_tid);
2107
2108         wait_event(mdsc->cap_flushing_wq,
2109                    check_caps_flush(mdsc, want_flush_tid));
2110
2111         dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid);
2112 }
2113
2114 /*
2115  * called under s_mutex
2116  */
2117 static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
2118                                    struct ceph_mds_session *session)
2119 {
2120         struct ceph_msg *msg = NULL;
2121         struct ceph_mds_cap_release *head;
2122         struct ceph_mds_cap_item *item;
2123         struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
2124         struct ceph_cap *cap;
2125         LIST_HEAD(tmp_list);
2126         int num_cap_releases;
2127         __le32  barrier, *cap_barrier;
2128
2129         down_read(&osdc->lock);
2130         barrier = cpu_to_le32(osdc->epoch_barrier);
2131         up_read(&osdc->lock);
2132
2133         spin_lock(&session->s_cap_lock);
2134 again:
2135         list_splice_init(&session->s_cap_releases, &tmp_list);
2136         num_cap_releases = session->s_num_cap_releases;
2137         session->s_num_cap_releases = 0;
2138         spin_unlock(&session->s_cap_lock);
2139
2140         while (!list_empty(&tmp_list)) {
2141                 if (!msg) {
2142                         msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
2143                                         PAGE_SIZE, GFP_NOFS, false);
2144                         if (!msg)
2145                                 goto out_err;
2146                         head = msg->front.iov_base;
2147                         head->num = cpu_to_le32(0);
2148                         msg->front.iov_len = sizeof(*head);
2149
2150                         msg->hdr.version = cpu_to_le16(2);
2151                         msg->hdr.compat_version = cpu_to_le16(1);
2152                 }
2153
2154                 cap = list_first_entry(&tmp_list, struct ceph_cap,
2155                                         session_caps);
2156                 list_del(&cap->session_caps);
2157                 num_cap_releases--;
2158
2159                 head = msg->front.iov_base;
2160                 put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
2161                                    &head->num);
2162                 item = msg->front.iov_base + msg->front.iov_len;
2163                 item->ino = cpu_to_le64(cap->cap_ino);
2164                 item->cap_id = cpu_to_le64(cap->cap_id);
2165                 item->migrate_seq = cpu_to_le32(cap->mseq);
2166                 item->seq = cpu_to_le32(cap->issue_seq);
2167                 msg->front.iov_len += sizeof(*item);
2168
2169                 ceph_put_cap(mdsc, cap);
2170
2171                 if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
2172                         // Append cap_barrier field
2173                         cap_barrier = msg->front.iov_base + msg->front.iov_len;
2174                         *cap_barrier = barrier;
2175                         msg->front.iov_len += sizeof(*cap_barrier);
2176
2177                         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2178                         dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2179                         ceph_con_send(&session->s_con, msg);
2180                         msg = NULL;
2181                 }
2182         }
2183
2184         BUG_ON(num_cap_releases != 0);
2185
2186         spin_lock(&session->s_cap_lock);
2187         if (!list_empty(&session->s_cap_releases))
2188                 goto again;
2189         spin_unlock(&session->s_cap_lock);
2190
2191         if (msg) {
2192                 // Append cap_barrier field
2193                 cap_barrier = msg->front.iov_base + msg->front.iov_len;
2194                 *cap_barrier = barrier;
2195                 msg->front.iov_len += sizeof(*cap_barrier);
2196
2197                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2198                 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
2199                 ceph_con_send(&session->s_con, msg);
2200         }
2201         return;
2202 out_err:
2203         pr_err("send_cap_releases mds%d, failed to allocate message\n",
2204                 session->s_mds);
2205         spin_lock(&session->s_cap_lock);
2206         list_splice(&tmp_list, &session->s_cap_releases);
2207         session->s_num_cap_releases += num_cap_releases;
2208         spin_unlock(&session->s_cap_lock);
2209 }
2210
2211 static void ceph_cap_release_work(struct work_struct *work)
2212 {
2213         struct ceph_mds_session *session =
2214                 container_of(work, struct ceph_mds_session, s_cap_release_work);
2215
2216         mutex_lock(&session->s_mutex);
2217         if (session->s_state == CEPH_MDS_SESSION_OPEN ||
2218             session->s_state == CEPH_MDS_SESSION_HUNG)
2219                 ceph_send_cap_releases(session->s_mdsc, session);
2220         mutex_unlock(&session->s_mutex);
2221         ceph_put_mds_session(session);
2222 }
2223
2224 void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
2225                              struct ceph_mds_session *session)
2226 {
2227         if (mdsc->stopping)
2228                 return;
2229
2230         ceph_get_mds_session(session);
2231         if (queue_work(mdsc->fsc->cap_wq,
2232                        &session->s_cap_release_work)) {
2233                 dout("cap release work queued\n");
2234         } else {
2235                 ceph_put_mds_session(session);
2236                 dout("failed to queue cap release work\n");
2237         }
2238 }
2239
2240 /*
2241  * caller holds session->s_cap_lock
2242  */
2243 void __ceph_queue_cap_release(struct ceph_mds_session *session,
2244                               struct ceph_cap *cap)
2245 {
2246         list_add_tail(&cap->session_caps, &session->s_cap_releases);
2247         session->s_num_cap_releases++;
2248
2249         if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
2250                 ceph_flush_cap_releases(session->s_mdsc, session);
2251 }
2252
2253 static void ceph_cap_reclaim_work(struct work_struct *work)
2254 {
2255         struct ceph_mds_client *mdsc =
2256                 container_of(work, struct ceph_mds_client, cap_reclaim_work);
2257         int ret = ceph_trim_dentries(mdsc);
2258         if (ret == -EAGAIN)
2259                 ceph_queue_cap_reclaim_work(mdsc);
2260 }
2261
2262 void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
2263 {
2264         if (mdsc->stopping)
2265                 return;
2266
2267         if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
2268                 dout("caps reclaim work queued\n");
2269         } else {
2270                 dout("failed to queue caps release work\n");
2271         }
2272 }
2273
2274 void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
2275 {
2276         int val;
2277         if (!nr)
2278                 return;
2279         val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
2280         if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
2281                 atomic_set(&mdsc->cap_reclaim_pending, 0);
2282                 ceph_queue_cap_reclaim_work(mdsc);
2283         }
2284 }
2285
2286 /*
2287  * requests
2288  */
2289
2290 int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
2291                                     struct inode *dir)
2292 {
2293         struct ceph_inode_info *ci = ceph_inode(dir);
2294         struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
2295         struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options;
2296         size_t size = sizeof(struct ceph_mds_reply_dir_entry);
2297         unsigned int num_entries;
2298         int order;
2299
2300         spin_lock(&ci->i_ceph_lock);
2301         num_entries = ci->i_files + ci->i_subdirs;
2302         spin_unlock(&ci->i_ceph_lock);
2303         num_entries = max(num_entries, 1U);
2304         num_entries = min(num_entries, opt->max_readdir);
2305
2306         order = get_order(size * num_entries);
2307         while (order >= 0) {
2308                 rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL |
2309                                                              __GFP_NOWARN,
2310                                                              order);
2311                 if (rinfo->dir_entries)
2312                         break;
2313                 order--;
2314         }
2315         if (!rinfo->dir_entries)
2316                 return -ENOMEM;
2317
2318         num_entries = (PAGE_SIZE << order) / size;
2319         num_entries = min(num_entries, opt->max_readdir);
2320
2321         rinfo->dir_buf_size = PAGE_SIZE << order;
2322         req->r_num_caps = num_entries + 1;
2323         req->r_args.readdir.max_entries = cpu_to_le32(num_entries);
2324         req->r_args.readdir.max_bytes = cpu_to_le32(opt->max_readdir_bytes);
2325         return 0;
2326 }
2327
2328 /*
2329  * Create an mds request.
2330  */
2331 struct ceph_mds_request *
2332 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
2333 {
2334         struct ceph_mds_request *req;
2335
2336         req = kmem_cache_zalloc(ceph_mds_request_cachep, GFP_NOFS);
2337         if (!req)
2338                 return ERR_PTR(-ENOMEM);
2339
2340         mutex_init(&req->r_fill_mutex);
2341         req->r_mdsc = mdsc;
2342         req->r_started = jiffies;
2343         req->r_start_latency = ktime_get();
2344         req->r_resend_mds = -1;
2345         INIT_LIST_HEAD(&req->r_unsafe_dir_item);
2346         INIT_LIST_HEAD(&req->r_unsafe_target_item);
2347         req->r_fmode = -1;
2348         kref_init(&req->r_kref);
2349         RB_CLEAR_NODE(&req->r_node);
2350         INIT_LIST_HEAD(&req->r_wait);
2351         init_completion(&req->r_completion);
2352         init_completion(&req->r_safe_completion);
2353         INIT_LIST_HEAD(&req->r_unsafe_item);
2354
2355         ktime_get_coarse_real_ts64(&req->r_stamp);
2356
2357         req->r_op = op;
2358         req->r_direct_mode = mode;
2359         return req;
2360 }
2361
2362 /*
2363  * return oldest (lowest) request, tid in request tree, 0 if none.
2364  *
2365  * called under mdsc->mutex.
2366  */
2367 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
2368 {
2369         if (RB_EMPTY_ROOT(&mdsc->request_tree))
2370                 return NULL;
2371         return rb_entry(rb_first(&mdsc->request_tree),
2372                         struct ceph_mds_request, r_node);
2373 }
2374
2375 static inline  u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
2376 {
2377         return mdsc->oldest_tid;
2378 }
2379
2380 /*
2381  * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
2382  * on build_path_from_dentry in fs/cifs/dir.c.
2383  *
2384  * If @stop_on_nosnap, generate path relative to the first non-snapped
2385  * inode.
2386  *
2387  * Encode hidden .snap dirs as a double /, i.e.
2388  *   foo/.snap/bar -> foo//bar
2389  */
2390 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *pbase,
2391                            int stop_on_nosnap)
2392 {
2393         struct dentry *temp;
2394         char *path;
2395         int pos;
2396         unsigned seq;
2397         u64 base;
2398
2399         if (!dentry)
2400                 return ERR_PTR(-EINVAL);
2401
2402         path = __getname();
2403         if (!path)
2404                 return ERR_PTR(-ENOMEM);
2405 retry:
2406         pos = PATH_MAX - 1;
2407         path[pos] = '\0';
2408
2409         seq = read_seqbegin(&rename_lock);
2410         rcu_read_lock();
2411         temp = dentry;
2412         for (;;) {
2413                 struct inode *inode;
2414
2415                 spin_lock(&temp->d_lock);
2416                 inode = d_inode(temp);
2417                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
2418                         dout("build_path path+%d: %p SNAPDIR\n",
2419                              pos, temp);
2420                 } else if (stop_on_nosnap && inode && dentry != temp &&
2421                            ceph_snap(inode) == CEPH_NOSNAP) {
2422                         spin_unlock(&temp->d_lock);
2423                         pos++; /* get rid of any prepended '/' */
2424                         break;
2425                 } else {
2426                         pos -= temp->d_name.len;
2427                         if (pos < 0) {
2428                                 spin_unlock(&temp->d_lock);
2429                                 break;
2430                         }
2431                         memcpy(path + pos, temp->d_name.name, temp->d_name.len);
2432                 }
2433                 spin_unlock(&temp->d_lock);
2434                 temp = READ_ONCE(temp->d_parent);
2435
2436                 /* Are we at the root? */
2437                 if (IS_ROOT(temp))
2438                         break;
2439
2440                 /* Are we out of buffer? */
2441                 if (--pos < 0)
2442                         break;
2443
2444                 path[pos] = '/';
2445         }
2446         base = ceph_ino(d_inode(temp));
2447         rcu_read_unlock();
2448
2449         if (read_seqretry(&rename_lock, seq))
2450                 goto retry;
2451
2452         if (pos < 0) {
2453                 /*
2454                  * A rename didn't occur, but somehow we didn't end up where
2455                  * we thought we would. Throw a warning and try again.
2456                  */
2457                 pr_warn("build_path did not end path lookup where "
2458                         "expected, pos is %d\n", pos);
2459                 goto retry;
2460         }
2461
2462         *pbase = base;
2463         *plen = PATH_MAX - 1 - pos;
2464         dout("build_path on %p %d built %llx '%.*s'\n",
2465              dentry, d_count(dentry), base, *plen, path + pos);
2466         return path + pos;
2467 }
2468
2469 static int build_dentry_path(struct dentry *dentry, struct inode *dir,
2470                              const char **ppath, int *ppathlen, u64 *pino,
2471                              bool *pfreepath, bool parent_locked)
2472 {
2473         char *path;
2474
2475         rcu_read_lock();
2476         if (!dir)
2477                 dir = d_inode_rcu(dentry->d_parent);
2478         if (dir && parent_locked && ceph_snap(dir) == CEPH_NOSNAP) {
2479                 *pino = ceph_ino(dir);
2480                 rcu_read_unlock();
2481                 *ppath = dentry->d_name.name;
2482                 *ppathlen = dentry->d_name.len;
2483                 return 0;
2484         }
2485         rcu_read_unlock();
2486         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2487         if (IS_ERR(path))
2488                 return PTR_ERR(path);
2489         *ppath = path;
2490         *pfreepath = true;
2491         return 0;
2492 }
2493
2494 static int build_inode_path(struct inode *inode,
2495                             const char **ppath, int *ppathlen, u64 *pino,
2496                             bool *pfreepath)
2497 {
2498         struct dentry *dentry;
2499         char *path;
2500
2501         if (ceph_snap(inode) == CEPH_NOSNAP) {
2502                 *pino = ceph_ino(inode);
2503                 *ppathlen = 0;
2504                 return 0;
2505         }
2506         dentry = d_find_alias(inode);
2507         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
2508         dput(dentry);
2509         if (IS_ERR(path))
2510                 return PTR_ERR(path);
2511         *ppath = path;
2512         *pfreepath = true;
2513         return 0;
2514 }
2515
2516 /*
2517  * request arguments may be specified via an inode *, a dentry *, or
2518  * an explicit ino+path.
2519  */
2520 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
2521                                   struct inode *rdiri, const char *rpath,
2522                                   u64 rino, const char **ppath, int *pathlen,
2523                                   u64 *ino, bool *freepath, bool parent_locked)
2524 {
2525         int r = 0;
2526
2527         if (rinode) {
2528                 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
2529                 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
2530                      ceph_snap(rinode));
2531         } else if (rdentry) {
2532                 r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino,
2533                                         freepath, parent_locked);
2534                 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
2535                      *ppath);
2536         } else if (rpath || rino) {
2537                 *ino = rino;
2538                 *ppath = rpath;
2539                 *pathlen = rpath ? strlen(rpath) : 0;
2540                 dout(" path %.*s\n", *pathlen, rpath);
2541         }
2542
2543         return r;
2544 }
2545
2546 /*
2547  * called under mdsc->mutex
2548  */
2549 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
2550                                                struct ceph_mds_request *req,
2551                                                int mds, bool drop_cap_releases)
2552 {
2553         struct ceph_msg *msg;
2554         struct ceph_mds_request_head *head;
2555         const char *path1 = NULL;
2556         const char *path2 = NULL;
2557         u64 ino1 = 0, ino2 = 0;
2558         int pathlen1 = 0, pathlen2 = 0;
2559         bool freepath1 = false, freepath2 = false;
2560         int len;
2561         u16 releases;
2562         void *p, *end;
2563         int ret;
2564
2565         ret = set_request_path_attr(req->r_inode, req->r_dentry,
2566                               req->r_parent, req->r_path1, req->r_ino1.ino,
2567                               &path1, &pathlen1, &ino1, &freepath1,
2568                               test_bit(CEPH_MDS_R_PARENT_LOCKED,
2569                                         &req->r_req_flags));
2570         if (ret < 0) {
2571                 msg = ERR_PTR(ret);
2572                 goto out;
2573         }
2574
2575         /* If r_old_dentry is set, then assume that its parent is locked */
2576         ret = set_request_path_attr(NULL, req->r_old_dentry,
2577                               req->r_old_dentry_dir,
2578                               req->r_path2, req->r_ino2.ino,
2579                               &path2, &pathlen2, &ino2, &freepath2, true);
2580         if (ret < 0) {
2581                 msg = ERR_PTR(ret);
2582                 goto out_free1;
2583         }
2584
2585         len = sizeof(*head) +
2586                 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64)) +
2587                 sizeof(struct ceph_timespec);
2588
2589         /* calculate (max) length for cap releases */
2590         len += sizeof(struct ceph_mds_request_release) *
2591                 (!!req->r_inode_drop + !!req->r_dentry_drop +
2592                  !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
2593         if (req->r_dentry_drop)
2594                 len += pathlen1;
2595         if (req->r_old_dentry_drop)
2596                 len += pathlen2;
2597
2598         msg = ceph_msg_new2(CEPH_MSG_CLIENT_REQUEST, len, 1, GFP_NOFS, false);
2599         if (!msg) {
2600                 msg = ERR_PTR(-ENOMEM);
2601                 goto out_free2;
2602         }
2603
2604         msg->hdr.version = cpu_to_le16(2);
2605         msg->hdr.tid = cpu_to_le64(req->r_tid);
2606
2607         head = msg->front.iov_base;
2608         p = msg->front.iov_base + sizeof(*head);
2609         end = msg->front.iov_base + msg->front.iov_len;
2610
2611         head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
2612         head->op = cpu_to_le32(req->r_op);
2613         head->caller_uid = cpu_to_le32(from_kuid(&init_user_ns, req->r_uid));
2614         head->caller_gid = cpu_to_le32(from_kgid(&init_user_ns, req->r_gid));
2615         head->ino = cpu_to_le64(req->r_deleg_ino);
2616         head->args = req->r_args;
2617
2618         ceph_encode_filepath(&p, end, ino1, path1);
2619         ceph_encode_filepath(&p, end, ino2, path2);
2620
2621         /* make note of release offset, in case we need to replay */
2622         req->r_request_release_offset = p - msg->front.iov_base;
2623
2624         /* cap releases */
2625         releases = 0;
2626         if (req->r_inode_drop)
2627                 releases += ceph_encode_inode_release(&p,
2628                       req->r_inode ? req->r_inode : d_inode(req->r_dentry),
2629                       mds, req->r_inode_drop, req->r_inode_unless,
2630                       req->r_op == CEPH_MDS_OP_READDIR);
2631         if (req->r_dentry_drop)
2632                 releases += ceph_encode_dentry_release(&p, req->r_dentry,
2633                                 req->r_parent, mds, req->r_dentry_drop,
2634                                 req->r_dentry_unless);
2635         if (req->r_old_dentry_drop)
2636                 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
2637                                 req->r_old_dentry_dir, mds,
2638                                 req->r_old_dentry_drop,
2639                                 req->r_old_dentry_unless);
2640         if (req->r_old_inode_drop)
2641                 releases += ceph_encode_inode_release(&p,
2642                       d_inode(req->r_old_dentry),
2643                       mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
2644
2645         if (drop_cap_releases) {
2646                 releases = 0;
2647                 p = msg->front.iov_base + req->r_request_release_offset;
2648         }
2649
2650         head->num_releases = cpu_to_le16(releases);
2651
2652         /* time stamp */
2653         {
2654                 struct ceph_timespec ts;
2655                 ceph_encode_timespec64(&ts, &req->r_stamp);
2656                 ceph_encode_copy(&p, &ts, sizeof(ts));
2657         }
2658
2659         if (WARN_ON_ONCE(p > end)) {
2660                 ceph_msg_put(msg);
2661                 msg = ERR_PTR(-ERANGE);
2662                 goto out_free2;
2663         }
2664
2665         msg->front.iov_len = p - msg->front.iov_base;
2666         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2667
2668         if (req->r_pagelist) {
2669                 struct ceph_pagelist *pagelist = req->r_pagelist;
2670                 ceph_msg_data_add_pagelist(msg, pagelist);
2671                 msg->hdr.data_len = cpu_to_le32(pagelist->length);
2672         } else {
2673                 msg->hdr.data_len = 0;
2674         }
2675
2676         msg->hdr.data_off = cpu_to_le16(0);
2677
2678 out_free2:
2679         if (freepath2)
2680                 ceph_mdsc_free_path((char *)path2, pathlen2);
2681 out_free1:
2682         if (freepath1)
2683                 ceph_mdsc_free_path((char *)path1, pathlen1);
2684 out:
2685         return msg;
2686 }
2687
2688 /*
2689  * called under mdsc->mutex if error, under no mutex if
2690  * success.
2691  */
2692 static void complete_request(struct ceph_mds_client *mdsc,
2693                              struct ceph_mds_request *req)
2694 {
2695         req->r_end_latency = ktime_get();
2696
2697         if (req->r_callback)
2698                 req->r_callback(mdsc, req);
2699         complete_all(&req->r_completion);
2700 }
2701
2702 /*
2703  * called under mdsc->mutex
2704  */
2705 static int __prepare_send_request(struct ceph_mds_client *mdsc,
2706                                   struct ceph_mds_request *req,
2707                                   int mds, bool drop_cap_releases)
2708 {
2709         struct ceph_mds_request_head *rhead;
2710         struct ceph_msg *msg;
2711         int flags = 0;
2712
2713         req->r_attempts++;
2714         if (req->r_inode) {
2715                 struct ceph_cap *cap =
2716                         ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
2717
2718                 if (cap)
2719                         req->r_sent_on_mseq = cap->mseq;
2720                 else
2721                         req->r_sent_on_mseq = -1;
2722         }
2723         dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
2724              req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
2725
2726         if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
2727                 void *p;
2728                 /*
2729                  * Replay.  Do not regenerate message (and rebuild
2730                  * paths, etc.); just use the original message.
2731                  * Rebuilding paths will break for renames because
2732                  * d_move mangles the src name.
2733                  */
2734                 msg = req->r_request;
2735                 rhead = msg->front.iov_base;
2736
2737                 flags = le32_to_cpu(rhead->flags);
2738                 flags |= CEPH_MDS_FLAG_REPLAY;
2739                 rhead->flags = cpu_to_le32(flags);
2740
2741                 if (req->r_target_inode)
2742                         rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
2743
2744                 rhead->num_retry = req->r_attempts - 1;
2745
2746                 /* remove cap/dentry releases from message */
2747                 rhead->num_releases = 0;
2748
2749                 /* time stamp */
2750                 p = msg->front.iov_base + req->r_request_release_offset;
2751                 {
2752                         struct ceph_timespec ts;
2753                         ceph_encode_timespec64(&ts, &req->r_stamp);
2754                         ceph_encode_copy(&p, &ts, sizeof(ts));
2755                 }
2756
2757                 msg->front.iov_len = p - msg->front.iov_base;
2758                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2759                 return 0;
2760         }
2761
2762         if (req->r_request) {
2763                 ceph_msg_put(req->r_request);
2764                 req->r_request = NULL;
2765         }
2766         msg = create_request_message(mdsc, req, mds, drop_cap_releases);
2767         if (IS_ERR(msg)) {
2768                 req->r_err = PTR_ERR(msg);
2769                 return PTR_ERR(msg);
2770         }
2771         req->r_request = msg;
2772
2773         rhead = msg->front.iov_base;
2774         rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
2775         if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2776                 flags |= CEPH_MDS_FLAG_REPLAY;
2777         if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags))
2778                 flags |= CEPH_MDS_FLAG_ASYNC;
2779         if (req->r_parent)
2780                 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
2781         rhead->flags = cpu_to_le32(flags);
2782         rhead->num_fwd = req->r_num_fwd;
2783         rhead->num_retry = req->r_attempts - 1;
2784
2785         dout(" r_parent = %p\n", req->r_parent);
2786         return 0;
2787 }
2788
2789 /*
2790  * called under mdsc->mutex
2791  */
2792 static int __send_request(struct ceph_mds_client *mdsc,
2793                           struct ceph_mds_session *session,
2794                           struct ceph_mds_request *req,
2795                           bool drop_cap_releases)
2796 {
2797         int err;
2798
2799         err = __prepare_send_request(mdsc, req, session->s_mds,
2800                                      drop_cap_releases);
2801         if (!err) {
2802                 ceph_msg_get(req->r_request);
2803                 ceph_con_send(&session->s_con, req->r_request);
2804         }
2805
2806         return err;
2807 }
2808
2809 /*
2810  * send request, or put it on the appropriate wait list.
2811  */
2812 static void __do_request(struct ceph_mds_client *mdsc,
2813                         struct ceph_mds_request *req)
2814 {
2815         struct ceph_mds_session *session = NULL;
2816         int mds = -1;
2817         int err = 0;
2818         bool random;
2819
2820         if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
2821                 if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags))
2822                         __unregister_request(mdsc, req);
2823                 return;
2824         }
2825
2826         if (req->r_timeout &&
2827             time_after_eq(jiffies, req->r_started + req->r_timeout)) {
2828                 dout("do_request timed out\n");
2829                 err = -ETIMEDOUT;
2830                 goto finish;
2831         }
2832         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) {
2833                 dout("do_request forced umount\n");
2834                 err = -EIO;
2835                 goto finish;
2836         }
2837         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) {
2838                 if (mdsc->mdsmap_err) {
2839                         err = mdsc->mdsmap_err;
2840                         dout("do_request mdsmap err %d\n", err);
2841                         goto finish;
2842                 }
2843                 if (mdsc->mdsmap->m_epoch == 0) {
2844                         dout("do_request no mdsmap, waiting for map\n");
2845                         list_add(&req->r_wait, &mdsc->waiting_for_map);
2846                         return;
2847                 }
2848                 if (!(mdsc->fsc->mount_options->flags &
2849                       CEPH_MOUNT_OPT_MOUNTWAIT) &&
2850                     !ceph_mdsmap_is_cluster_available(mdsc->mdsmap)) {
2851                         err = -EHOSTUNREACH;
2852                         goto finish;
2853                 }
2854         }
2855
2856         put_request_session(req);
2857
2858         mds = __choose_mds(mdsc, req, &random);
2859         if (mds < 0 ||
2860             ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
2861                 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2862                         err = -EJUKEBOX;
2863                         goto finish;
2864                 }
2865                 dout("do_request no mds or not active, waiting for map\n");
2866                 list_add(&req->r_wait, &mdsc->waiting_for_map);
2867                 return;
2868         }
2869
2870         /* get, open session */
2871         session = __ceph_lookup_mds_session(mdsc, mds);
2872         if (!session) {
2873                 session = register_session(mdsc, mds);
2874                 if (IS_ERR(session)) {
2875                         err = PTR_ERR(session);
2876                         goto finish;
2877                 }
2878         }
2879         req->r_session = ceph_get_mds_session(session);
2880
2881         dout("do_request mds%d session %p state %s\n", mds, session,
2882              ceph_session_state_name(session->s_state));
2883         if (session->s_state != CEPH_MDS_SESSION_OPEN &&
2884             session->s_state != CEPH_MDS_SESSION_HUNG) {
2885                 if (session->s_state == CEPH_MDS_SESSION_REJECTED) {
2886                         err = -EACCES;
2887                         goto out_session;
2888                 }
2889                 /*
2890                  * We cannot queue async requests since the caps and delegated
2891                  * inodes are bound to the session. Just return -EJUKEBOX and
2892                  * let the caller retry a sync request in that case.
2893                  */
2894                 if (test_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags)) {
2895                         err = -EJUKEBOX;
2896                         goto out_session;
2897                 }
2898                 if (session->s_state == CEPH_MDS_SESSION_NEW ||
2899                     session->s_state == CEPH_MDS_SESSION_CLOSING) {
2900                         err = __open_session(mdsc, session);
2901                         if (err)
2902                                 goto out_session;
2903                         /* retry the same mds later */
2904                         if (random)
2905                                 req->r_resend_mds = mds;
2906                 }
2907                 list_add(&req->r_wait, &session->s_waiting);
2908                 goto out_session;
2909         }
2910
2911         /* send request */
2912         req->r_resend_mds = -1;   /* forget any previous mds hint */
2913
2914         if (req->r_request_started == 0)   /* note request start time */
2915                 req->r_request_started = jiffies;
2916
2917         err = __send_request(mdsc, session, req, false);
2918
2919 out_session:
2920         ceph_put_mds_session(session);
2921 finish:
2922         if (err) {
2923                 dout("__do_request early error %d\n", err);
2924                 req->r_err = err;
2925                 complete_request(mdsc, req);
2926                 __unregister_request(mdsc, req);
2927         }
2928         return;
2929 }
2930
2931 /*
2932  * called under mdsc->mutex
2933  */
2934 static void __wake_requests(struct ceph_mds_client *mdsc,
2935                             struct list_head *head)
2936 {
2937         struct ceph_mds_request *req;
2938         LIST_HEAD(tmp_list);
2939
2940         list_splice_init(head, &tmp_list);
2941
2942         while (!list_empty(&tmp_list)) {
2943                 req = list_entry(tmp_list.next,
2944                                  struct ceph_mds_request, r_wait);
2945                 list_del_init(&req->r_wait);
2946                 dout(" wake request %p tid %llu\n", req, req->r_tid);
2947                 __do_request(mdsc, req);
2948         }
2949 }
2950
2951 /*
2952  * Wake up threads with requests pending for @mds, so that they can
2953  * resubmit their requests to a possibly different mds.
2954  */
2955 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
2956 {
2957         struct ceph_mds_request *req;
2958         struct rb_node *p = rb_first(&mdsc->request_tree);
2959
2960         dout("kick_requests mds%d\n", mds);
2961         while (p) {
2962                 req = rb_entry(p, struct ceph_mds_request, r_node);
2963                 p = rb_next(p);
2964                 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
2965                         continue;
2966                 if (req->r_attempts > 0)
2967                         continue; /* only new requests */
2968                 if (req->r_session &&
2969                     req->r_session->s_mds == mds) {
2970                         dout(" kicking tid %llu\n", req->r_tid);
2971                         list_del_init(&req->r_wait);
2972                         __do_request(mdsc, req);
2973                 }
2974         }
2975 }
2976
2977 int ceph_mdsc_submit_request(struct ceph_mds_client *mdsc, struct inode *dir,
2978                               struct ceph_mds_request *req)
2979 {
2980         int err = 0;
2981
2982         /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
2983         if (req->r_inode)
2984                 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
2985         if (req->r_parent) {
2986                 struct ceph_inode_info *ci = ceph_inode(req->r_parent);
2987                 int fmode = (req->r_op & CEPH_MDS_OP_WRITE) ?
2988                             CEPH_FILE_MODE_WR : CEPH_FILE_MODE_RD;
2989                 spin_lock(&ci->i_ceph_lock);
2990                 ceph_take_cap_refs(ci, CEPH_CAP_PIN, false);
2991                 __ceph_touch_fmode(ci, mdsc, fmode);
2992                 spin_unlock(&ci->i_ceph_lock);
2993                 ihold(req->r_parent);
2994         }
2995         if (req->r_old_dentry_dir)
2996                 ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
2997                                   CEPH_CAP_PIN);
2998
2999         if (req->r_inode) {
3000                 err = ceph_wait_on_async_create(req->r_inode);
3001                 if (err) {
3002                         dout("%s: wait for async create returned: %d\n",
3003                              __func__, err);
3004                         return err;
3005                 }
3006         }
3007
3008         if (!err && req->r_old_inode) {
3009                 err = ceph_wait_on_async_create(req->r_old_inode);
3010                 if (err) {
3011                         dout("%s: wait for async create returned: %d\n",
3012                              __func__, err);
3013                         return err;
3014                 }
3015         }
3016
3017         dout("submit_request on %p for inode %p\n", req, dir);
3018         mutex_lock(&mdsc->mutex);
3019         __register_request(mdsc, req, dir);
3020         __do_request(mdsc, req);
3021         err = req->r_err;
3022         mutex_unlock(&mdsc->mutex);
3023         return err;
3024 }
3025
3026 static int ceph_mdsc_wait_request(struct ceph_mds_client *mdsc,
3027                                   struct ceph_mds_request *req)
3028 {
3029         int err;
3030
3031         /* wait */
3032         dout("do_request waiting\n");
3033         if (!req->r_timeout && req->r_wait_for_completion) {
3034                 err = req->r_wait_for_completion(mdsc, req);
3035         } else {
3036                 long timeleft = wait_for_completion_killable_timeout(
3037                                         &req->r_completion,
3038                                         ceph_timeout_jiffies(req->r_timeout));
3039                 if (timeleft > 0)
3040                         err = 0;
3041                 else if (!timeleft)
3042                         err = -ETIMEDOUT;  /* timed out */
3043                 else
3044                         err = timeleft;  /* killed */
3045         }
3046         dout("do_request waited, got %d\n", err);
3047         mutex_lock(&mdsc->mutex);
3048
3049         /* only abort if we didn't race with a real reply */
3050         if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) {
3051                 err = le32_to_cpu(req->r_reply_info.head->result);
3052         } else if (err < 0) {
3053                 dout("aborted request %lld with %d\n", req->r_tid, err);
3054
3055                 /*
3056                  * ensure we aren't running concurrently with
3057                  * ceph_fill_trace or ceph_readdir_prepopulate, which
3058                  * rely on locks (dir mutex) held by our caller.
3059                  */
3060                 mutex_lock(&req->r_fill_mutex);
3061                 req->r_err = err;
3062                 set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
3063                 mutex_unlock(&req->r_fill_mutex);
3064
3065                 if (req->r_parent &&
3066                     (req->r_op & CEPH_MDS_OP_WRITE))
3067                         ceph_invalidate_dir_request(req);
3068         } else {
3069                 err = req->r_err;
3070         }
3071
3072         mutex_unlock(&mdsc->mutex);
3073         return err;
3074 }
3075
3076 /*
3077  * Synchrously perform an mds request.  Take care of all of the
3078  * session setup, forwarding, retry details.
3079  */
3080 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
3081                          struct inode *dir,
3082                          struct ceph_mds_request *req)
3083 {
3084         int err;
3085
3086         dout("do_request on %p\n", req);
3087
3088         /* issue */
3089         err = ceph_mdsc_submit_request(mdsc, dir, req);
3090         if (!err)
3091                 err = ceph_mdsc_wait_request(mdsc, req);
3092         dout("do_request %p done, result %d\n", req, err);
3093         return err;
3094 }
3095
3096 /*
3097  * Invalidate dir's completeness, dentry lease state on an aborted MDS
3098  * namespace request.
3099  */
3100 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
3101 {
3102         struct inode *dir = req->r_parent;
3103         struct inode *old_dir = req->r_old_dentry_dir;
3104
3105         dout("invalidate_dir_request %p %p (complete, lease(s))\n", dir, old_dir);
3106
3107         ceph_dir_clear_complete(dir);
3108         if (old_dir)
3109                 ceph_dir_clear_complete(old_dir);
3110         if (req->r_dentry)
3111                 ceph_invalidate_dentry_lease(req->r_dentry);
3112         if (req->r_old_dentry)
3113                 ceph_invalidate_dentry_lease(req->r_old_dentry);
3114 }
3115
3116 /*
3117  * Handle mds reply.
3118  *
3119  * We take the session mutex and parse and process the reply immediately.
3120  * This preserves the logical ordering of replies, capabilities, etc., sent
3121  * by the MDS as they are applied to our local cache.
3122  */
3123 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
3124 {
3125         struct ceph_mds_client *mdsc = session->s_mdsc;
3126         struct ceph_mds_request *req;
3127         struct ceph_mds_reply_head *head = msg->front.iov_base;
3128         struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
3129         struct ceph_snap_realm *realm;
3130         u64 tid;
3131         int err, result;
3132         int mds = session->s_mds;
3133
3134         if (msg->front.iov_len < sizeof(*head)) {
3135                 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
3136                 ceph_msg_dump(msg);
3137                 return;
3138         }
3139
3140         /* get request, session */
3141         tid = le64_to_cpu(msg->hdr.tid);
3142         mutex_lock(&mdsc->mutex);
3143         req = lookup_get_request(mdsc, tid);
3144         if (!req) {
3145                 dout("handle_reply on unknown tid %llu\n", tid);
3146                 mutex_unlock(&mdsc->mutex);
3147                 return;
3148         }
3149         dout("handle_reply %p\n", req);
3150
3151         /* correct session? */
3152         if (req->r_session != session) {
3153                 pr_err("mdsc_handle_reply got %llu on session mds%d"
3154                        " not mds%d\n", tid, session->s_mds,
3155                        req->r_session ? req->r_session->s_mds : -1);
3156                 mutex_unlock(&mdsc->mutex);
3157                 goto out;
3158         }
3159
3160         /* dup? */
3161         if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) ||
3162             (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) {
3163                 pr_warn("got a dup %s reply on %llu from mds%d\n",
3164                            head->safe ? "safe" : "unsafe", tid, mds);
3165                 mutex_unlock(&mdsc->mutex);
3166                 goto out;
3167         }
3168         if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) {
3169                 pr_warn("got unsafe after safe on %llu from mds%d\n",
3170                            tid, mds);
3171                 mutex_unlock(&mdsc->mutex);
3172                 goto out;
3173         }
3174
3175         result = le32_to_cpu(head->result);
3176
3177         /*
3178          * Handle an ESTALE
3179          * if we're not talking to the authority, send to them
3180          * if the authority has changed while we weren't looking,
3181          * send to new authority
3182          * Otherwise we just have to return an ESTALE
3183          */
3184         if (result == -ESTALE) {
3185                 dout("got ESTALE on request %llu\n", req->r_tid);
3186                 req->r_resend_mds = -1;
3187                 if (req->r_direct_mode != USE_AUTH_MDS) {
3188                         dout("not using auth, setting for that now\n");
3189                         req->r_direct_mode = USE_AUTH_MDS;
3190                         __do_request(mdsc, req);
3191                         mutex_unlock(&mdsc->mutex);
3192                         goto out;
3193                 } else  {
3194                         int mds = __choose_mds(mdsc, req, NULL);
3195                         if (mds >= 0 && mds != req->r_session->s_mds) {
3196                                 dout("but auth changed, so resending\n");
3197                                 __do_request(mdsc, req);
3198                                 mutex_unlock(&mdsc->mutex);
3199                                 goto out;
3200                         }
3201                 }
3202                 dout("have to return ESTALE on request %llu\n", req->r_tid);
3203         }
3204
3205
3206         if (head->safe) {
3207                 set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags);
3208                 __unregister_request(mdsc, req);
3209
3210                 /* last request during umount? */
3211                 if (mdsc->stopping && !__get_oldest_req(mdsc))
3212                         complete_all(&mdsc->safe_umount_waiters);
3213
3214                 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3215                         /*
3216                          * We already handled the unsafe response, now do the
3217                          * cleanup.  No need to examine the response; the MDS
3218                          * doesn't include any result info in the safe
3219                          * response.  And even if it did, there is nothing
3220                          * useful we could do with a revised return value.
3221                          */
3222                         dout("got safe reply %llu, mds%d\n", tid, mds);
3223
3224                         mutex_unlock(&mdsc->mutex);
3225                         goto out;
3226                 }
3227         } else {
3228                 set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags);
3229                 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
3230         }
3231
3232         dout("handle_reply tid %lld result %d\n", tid, result);
3233         rinfo = &req->r_reply_info;
3234         if (test_bit(CEPHFS_FEATURE_REPLY_ENCODING, &session->s_features))
3235                 err = parse_reply_info(session, msg, rinfo, (u64)-1);
3236         else
3237                 err = parse_reply_info(session, msg, rinfo, session->s_con.peer_features);
3238         mutex_unlock(&mdsc->mutex);
3239
3240         mutex_lock(&session->s_mutex);
3241         if (err < 0) {
3242                 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
3243                 ceph_msg_dump(msg);
3244                 goto out_err;
3245         }
3246
3247         /* snap trace */
3248         realm = NULL;
3249         if (rinfo->snapblob_len) {
3250                 down_write(&mdsc->snap_rwsem);
3251                 ceph_update_snap_trace(mdsc, rinfo->snapblob,
3252                                 rinfo->snapblob + rinfo->snapblob_len,
3253                                 le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
3254                                 &realm);
3255                 downgrade_write(&mdsc->snap_rwsem);
3256         } else {
3257                 down_read(&mdsc->snap_rwsem);
3258         }
3259
3260         /* insert trace into our cache */
3261         mutex_lock(&req->r_fill_mutex);
3262         current->journal_info = req;
3263         err = ceph_fill_trace(mdsc->fsc->sb, req);
3264         if (err == 0) {
3265                 if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
3266                                     req->r_op == CEPH_MDS_OP_LSSNAP))
3267                         ceph_readdir_prepopulate(req, req->r_session);
3268         }
3269         current->journal_info = NULL;
3270         mutex_unlock(&req->r_fill_mutex);
3271
3272         up_read(&mdsc->snap_rwsem);
3273         if (realm)
3274                 ceph_put_snap_realm(mdsc, realm);
3275
3276         if (err == 0) {
3277                 if (req->r_target_inode &&
3278                     test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
3279                         struct ceph_inode_info *ci =
3280                                 ceph_inode(req->r_target_inode);
3281                         spin_lock(&ci->i_unsafe_lock);
3282                         list_add_tail(&req->r_unsafe_target_item,
3283                                       &ci->i_unsafe_iops);
3284                         spin_unlock(&ci->i_unsafe_lock);
3285                 }
3286
3287                 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
3288         }
3289 out_err:
3290         mutex_lock(&mdsc->mutex);
3291         if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3292                 if (err) {
3293                         req->r_err = err;
3294                 } else {
3295                         req->r_reply =  ceph_msg_get(msg);
3296                         set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags);
3297                 }
3298         } else {
3299                 dout("reply arrived after request %lld was aborted\n", tid);
3300         }
3301         mutex_unlock(&mdsc->mutex);
3302
3303         mutex_unlock(&session->s_mutex);
3304
3305         /* kick calling process */
3306         complete_request(mdsc, req);
3307
3308         ceph_update_metadata_latency(&mdsc->metric, req->r_start_latency,
3309                                      req->r_end_latency, err);
3310 out:
3311         ceph_mdsc_put_request(req);
3312         return;
3313 }
3314
3315
3316
3317 /*
3318  * handle mds notification that our request has been forwarded.
3319  */
3320 static void handle_forward(struct ceph_mds_client *mdsc,
3321                            struct ceph_mds_session *session,
3322                            struct ceph_msg *msg)
3323 {
3324         struct ceph_mds_request *req;
3325         u64 tid = le64_to_cpu(msg->hdr.tid);
3326         u32 next_mds;
3327         u32 fwd_seq;
3328         int err = -EINVAL;
3329         void *p = msg->front.iov_base;
3330         void *end = p + msg->front.iov_len;
3331
3332         ceph_decode_need(&p, end, 2*sizeof(u32), bad);
3333         next_mds = ceph_decode_32(&p);
3334         fwd_seq = ceph_decode_32(&p);
3335
3336         mutex_lock(&mdsc->mutex);
3337         req = lookup_get_request(mdsc, tid);
3338         if (!req) {
3339                 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
3340                 goto out;  /* dup reply? */
3341         }
3342
3343         if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
3344                 dout("forward tid %llu aborted, unregistering\n", tid);
3345                 __unregister_request(mdsc, req);
3346         } else if (fwd_seq <= req->r_num_fwd) {
3347                 dout("forward tid %llu to mds%d - old seq %d <= %d\n",
3348                      tid, next_mds, req->r_num_fwd, fwd_seq);
3349         } else {
3350                 /* resend. forward race not possible; mds would drop */
3351                 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
3352                 BUG_ON(req->r_err);
3353                 BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags));
3354                 req->r_attempts = 0;
3355                 req->r_num_fwd = fwd_seq;
3356                 req->r_resend_mds = next_mds;
3357                 put_request_session(req);
3358                 __do_request(mdsc, req);
3359         }
3360         ceph_mdsc_put_request(req);
3361 out:
3362         mutex_unlock(&mdsc->mutex);
3363         return;
3364
3365 bad:
3366         pr_err("mdsc_handle_forward decode error err=%d\n", err);
3367 }
3368
3369 static int __decode_session_metadata(void **p, void *end,
3370                                      bool *blocklisted)
3371 {
3372         /* map<string,string> */
3373         u32 n;
3374         bool err_str;
3375         ceph_decode_32_safe(p, end, n, bad);
3376         while (n-- > 0) {
3377                 u32 len;
3378                 ceph_decode_32_safe(p, end, len, bad);
3379                 ceph_decode_need(p, end, len, bad);
3380                 err_str = !strncmp(*p, "error_string", len);
3381                 *p += len;
3382                 ceph_decode_32_safe(p, end, len, bad);
3383                 ceph_decode_need(p, end, len, bad);
3384                 /*
3385                  * Match "blocklisted (blacklisted)" from newer MDSes,
3386                  * or "blacklisted" from older MDSes.
3387                  */
3388                 if (err_str && strnstr(*p, "blacklisted", len))
3389                         *blocklisted = true;
3390                 *p += len;
3391         }
3392         return 0;
3393 bad:
3394         return -1;
3395 }
3396
3397 /*
3398  * handle a mds session control message
3399  */
3400 static void handle_session(struct ceph_mds_session *session,
3401                            struct ceph_msg *msg)
3402 {
3403         struct ceph_mds_client *mdsc = session->s_mdsc;
3404         int mds = session->s_mds;
3405         int msg_version = le16_to_cpu(msg->hdr.version);
3406         void *p = msg->front.iov_base;
3407         void *end = p + msg->front.iov_len;
3408         struct ceph_mds_session_head *h;
3409         u32 op;
3410         u64 seq, features = 0;
3411         int wake = 0;
3412         bool blocklisted = false;
3413
3414         /* decode */
3415         ceph_decode_need(&p, end, sizeof(*h), bad);
3416         h = p;
3417         p += sizeof(*h);
3418
3419         op = le32_to_cpu(h->op);
3420         seq = le64_to_cpu(h->seq);
3421
3422         if (msg_version >= 3) {
3423                 u32 len;
3424                 /* version >= 2, metadata */
3425                 if (__decode_session_metadata(&p, end, &blocklisted) < 0)
3426                         goto bad;
3427                 /* version >= 3, feature bits */
3428                 ceph_decode_32_safe(&p, end, len, bad);
3429                 if (len) {
3430                         ceph_decode_64_safe(&p, end, features, bad);
3431                         p += len - sizeof(features);
3432                 }
3433         }
3434
3435         mutex_lock(&mdsc->mutex);
3436         if (op == CEPH_SESSION_CLOSE) {
3437                 ceph_get_mds_session(session);
3438                 __unregister_session(mdsc, session);
3439         }
3440         /* FIXME: this ttl calculation is generous */
3441         session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
3442         mutex_unlock(&mdsc->mutex);
3443
3444         mutex_lock(&session->s_mutex);
3445
3446         dout("handle_session mds%d %s %p state %s seq %llu\n",
3447              mds, ceph_session_op_name(op), session,
3448              ceph_session_state_name(session->s_state), seq);
3449
3450         if (session->s_state == CEPH_MDS_SESSION_HUNG) {
3451                 session->s_state = CEPH_MDS_SESSION_OPEN;
3452                 pr_info("mds%d came back\n", session->s_mds);
3453         }
3454
3455         switch (op) {
3456         case CEPH_SESSION_OPEN:
3457                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3458                         pr_info("mds%d reconnect success\n", session->s_mds);
3459                 session->s_state = CEPH_MDS_SESSION_OPEN;
3460                 session->s_features = features;
3461                 renewed_caps(mdsc, session, 0);
3462                 if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &session->s_features))
3463                         metric_schedule_delayed(&mdsc->metric);
3464                 wake = 1;
3465                 if (mdsc->stopping)
3466                         __close_session(mdsc, session);
3467                 break;
3468
3469         case CEPH_SESSION_RENEWCAPS:
3470                 if (session->s_renew_seq == seq)
3471                         renewed_caps(mdsc, session, 1);
3472                 break;
3473
3474         case CEPH_SESSION_CLOSE:
3475                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
3476                         pr_info("mds%d reconnect denied\n", session->s_mds);
3477                 session->s_state = CEPH_MDS_SESSION_CLOSED;
3478                 cleanup_session_requests(mdsc, session);
3479                 remove_session_caps(session);
3480                 wake = 2; /* for good measure */
3481                 wake_up_all(&mdsc->session_close_wq);
3482                 break;
3483
3484         case CEPH_SESSION_STALE:
3485                 pr_info("mds%d caps went stale, renewing\n",
3486                         session->s_mds);
3487                 spin_lock(&session->s_gen_ttl_lock);
3488                 session->s_cap_gen++;
3489                 session->s_cap_ttl = jiffies - 1;
3490                 spin_unlock(&session->s_gen_ttl_lock);
3491                 send_renew_caps(mdsc, session);
3492                 break;
3493
3494         case CEPH_SESSION_RECALL_STATE:
3495                 ceph_trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
3496                 break;
3497
3498         case CEPH_SESSION_FLUSHMSG:
3499                 /* flush cap releases */
3500                 spin_lock(&session->s_cap_lock);
3501                 if (session->s_num_cap_releases)
3502                         ceph_flush_cap_releases(mdsc, session);
3503                 spin_unlock(&session->s_cap_lock);
3504
3505                 send_flushmsg_ack(mdsc, session, seq);
3506                 break;
3507
3508         case CEPH_SESSION_FORCE_RO:
3509                 dout("force_session_readonly %p\n", session);
3510                 spin_lock(&session->s_cap_lock);
3511                 session->s_readonly = true;
3512                 spin_unlock(&session->s_cap_lock);
3513                 wake_up_session_caps(session, FORCE_RO);
3514                 break;
3515
3516         case CEPH_SESSION_REJECT:
3517                 WARN_ON(session->s_state != CEPH_MDS_SESSION_OPENING);
3518                 pr_info("mds%d rejected session\n", session->s_mds);
3519                 session->s_state = CEPH_MDS_SESSION_REJECTED;
3520                 cleanup_session_requests(mdsc, session);
3521                 remove_session_caps(session);
3522                 if (blocklisted)
3523                         mdsc->fsc->blocklisted = true;
3524                 wake = 2; /* for good measure */
3525                 break;
3526
3527         default:
3528                 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
3529                 WARN_ON(1);
3530         }
3531
3532         mutex_unlock(&session->s_mutex);
3533         if (wake) {
3534                 mutex_lock(&mdsc->mutex);
3535                 __wake_requests(mdsc, &session->s_waiting);
3536                 if (wake == 2)
3537                         kick_requests(mdsc, mds);
3538                 mutex_unlock(&mdsc->mutex);
3539         }
3540         if (op == CEPH_SESSION_CLOSE)
3541                 ceph_put_mds_session(session);
3542         return;
3543
3544 bad:
3545         pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
3546                (int)msg->front.iov_len);
3547         ceph_msg_dump(msg);
3548         return;
3549 }
3550
3551 void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
3552 {
3553         int dcaps;
3554
3555         dcaps = xchg(&req->r_dir_caps, 0);
3556         if (dcaps) {
3557                 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3558                 ceph_put_cap_refs(ceph_inode(req->r_parent), dcaps);
3559         }
3560 }
3561
3562 void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req)
3563 {
3564         int dcaps;
3565
3566         dcaps = xchg(&req->r_dir_caps, 0);
3567         if (dcaps) {
3568                 dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
3569                 ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent),
3570                                                 dcaps);
3571         }
3572 }
3573
3574 /*
3575  * called under session->mutex.
3576  */
3577 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
3578                                    struct ceph_mds_session *session)
3579 {
3580         struct ceph_mds_request *req, *nreq;
3581         struct rb_node *p;
3582
3583         dout("replay_unsafe_requests mds%d\n", session->s_mds);
3584
3585         mutex_lock(&mdsc->mutex);
3586         list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item)
3587                 __send_request(mdsc, session, req, true);
3588
3589         /*
3590          * also re-send old requests when MDS enters reconnect stage. So that MDS
3591          * can process completed request in clientreplay stage.
3592          */
3593         p = rb_first(&mdsc->request_tree);
3594         while (p) {
3595                 req = rb_entry(p, struct ceph_mds_request, r_node);
3596                 p = rb_next(p);
3597                 if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
3598                         continue;
3599                 if (req->r_attempts == 0)
3600                         continue; /* only old requests */
3601                 if (!req->r_session)
3602                         continue;
3603                 if (req->r_session->s_mds != session->s_mds)
3604                         continue;
3605
3606                 ceph_mdsc_release_dir_caps_no_check(req);
3607
3608                 __send_request(mdsc, session, req, true);
3609         }
3610         mutex_unlock(&mdsc->mutex);
3611 }
3612
3613 static int send_reconnect_partial(struct ceph_reconnect_state *recon_state)
3614 {
3615         struct ceph_msg *reply;
3616         struct ceph_pagelist *_pagelist;
3617         struct page *page;
3618         __le32 *addr;
3619         int err = -ENOMEM;
3620
3621         if (!recon_state->allow_multi)
3622                 return -ENOSPC;
3623
3624         /* can't handle message that contains both caps and realm */
3625         BUG_ON(!recon_state->nr_caps == !recon_state->nr_realms);
3626
3627         /* pre-allocate new pagelist */
3628         _pagelist = ceph_pagelist_alloc(GFP_NOFS);
3629         if (!_pagelist)
3630                 return -ENOMEM;
3631
3632         reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3633         if (!reply)
3634                 goto fail_msg;
3635
3636         /* placeholder for nr_caps */
3637         err = ceph_pagelist_encode_32(_pagelist, 0);
3638         if (err < 0)
3639                 goto fail;
3640
3641         if (recon_state->nr_caps) {
3642                 /* currently encoding caps */
3643                 err = ceph_pagelist_encode_32(recon_state->pagelist, 0);
3644                 if (err)
3645                         goto fail;
3646         } else {
3647                 /* placeholder for nr_realms (currently encoding relams) */
3648                 err = ceph_pagelist_encode_32(_pagelist, 0);
3649                 if (err < 0)
3650                         goto fail;
3651         }
3652
3653         err = ceph_pagelist_encode_8(recon_state->pagelist, 1);
3654         if (err)
3655                 goto fail;
3656
3657         page = list_first_entry(&recon_state->pagelist->head, struct page, lru);
3658         addr = kmap_atomic(page);
3659         if (recon_state->nr_caps) {
3660                 /* currently encoding caps */
3661                 *addr = cpu_to_le32(recon_state->nr_caps);
3662         } else {
3663                 /* currently encoding relams */
3664                 *(addr + 1) = cpu_to_le32(recon_state->nr_realms);
3665         }
3666         kunmap_atomic(addr);
3667
3668         reply->hdr.version = cpu_to_le16(5);
3669         reply->hdr.compat_version = cpu_to_le16(4);
3670
3671         reply->hdr.data_len = cpu_to_le32(recon_state->pagelist->length);
3672         ceph_msg_data_add_pagelist(reply, recon_state->pagelist);
3673
3674         ceph_con_send(&recon_state->session->s_con, reply);
3675         ceph_pagelist_release(recon_state->pagelist);
3676
3677         recon_state->pagelist = _pagelist;
3678         recon_state->nr_caps = 0;
3679         recon_state->nr_realms = 0;
3680         recon_state->msg_version = 5;
3681         return 0;
3682 fail:
3683         ceph_msg_put(reply);
3684 fail_msg:
3685         ceph_pagelist_release(_pagelist);
3686         return err;
3687 }
3688
3689 static struct dentry* d_find_primary(struct inode *inode)
3690 {
3691         struct dentry *alias, *dn = NULL;
3692
3693         if (hlist_empty(&inode->i_dentry))
3694                 return NULL;
3695
3696         spin_lock(&inode->i_lock);
3697         if (hlist_empty(&inode->i_dentry))
3698                 goto out_unlock;
3699
3700         if (S_ISDIR(inode->i_mode)) {
3701                 alias = hlist_entry(inode->i_dentry.first, struct dentry, d_u.d_alias);
3702                 if (!IS_ROOT(alias))
3703                         dn = dget(alias);
3704                 goto out_unlock;
3705         }
3706
3707         hlist_for_each_entry(alias, &inode->i_dentry, d_u.d_alias) {
3708                 spin_lock(&alias->d_lock);
3709                 if (!d_unhashed(alias) &&
3710                     (ceph_dentry(alias)->flags & CEPH_DENTRY_PRIMARY_LINK)) {
3711                         dn = dget_dlock(alias);
3712                 }
3713                 spin_unlock(&alias->d_lock);
3714                 if (dn)
3715                         break;
3716         }
3717 out_unlock:
3718         spin_unlock(&inode->i_lock);
3719         return dn;
3720 }
3721
3722 /*
3723  * Encode information about a cap for a reconnect with the MDS.
3724  */
3725 static int reconnect_caps_cb(struct inode *inode, struct ceph_cap *cap,
3726                           void *arg)
3727 {
3728         union {
3729                 struct ceph_mds_cap_reconnect v2;
3730                 struct ceph_mds_cap_reconnect_v1 v1;
3731         } rec;
3732         struct ceph_inode_info *ci = cap->ci;
3733         struct ceph_reconnect_state *recon_state = arg;
3734         struct ceph_pagelist *pagelist = recon_state->pagelist;
3735         struct dentry *dentry;
3736         char *path;
3737         int pathlen = 0, err;
3738         u64 pathbase;
3739         u64 snap_follows;
3740
3741         dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
3742              inode, ceph_vinop(inode), cap, cap->cap_id,
3743              ceph_cap_string(cap->issued));
3744
3745         dentry = d_find_primary(inode);
3746         if (dentry) {
3747                 /* set pathbase to parent dir when msg_version >= 2 */
3748                 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase,
3749                                             recon_state->msg_version >= 2);
3750                 dput(dentry);
3751                 if (IS_ERR(path)) {
3752                         err = PTR_ERR(path);
3753                         goto out_err;
3754                 }
3755         } else {
3756                 path = NULL;
3757                 pathbase = 0;
3758         }
3759
3760         spin_lock(&ci->i_ceph_lock);
3761         cap->seq = 0;        /* reset cap seq */
3762         cap->issue_seq = 0;  /* and issue_seq */
3763         cap->mseq = 0;       /* and migrate_seq */
3764         cap->cap_gen = cap->session->s_cap_gen;
3765
3766         /* These are lost when the session goes away */
3767         if (S_ISDIR(inode->i_mode)) {
3768                 if (cap->issued & CEPH_CAP_DIR_CREATE) {
3769                         ceph_put_string(rcu_dereference_raw(ci->i_cached_layout.pool_ns));
3770                         memset(&ci->i_cached_layout, 0, sizeof(ci->i_cached_layout));
3771                 }
3772                 cap->issued &= ~CEPH_CAP_ANY_DIR_OPS;
3773         }
3774
3775         if (recon_state->msg_version >= 2) {
3776                 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
3777                 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3778                 rec.v2.issued = cpu_to_le32(cap->issued);
3779                 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3780                 rec.v2.pathbase = cpu_to_le64(pathbase);
3781                 rec.v2.flock_len = (__force __le32)
3782                         ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
3783         } else {
3784                 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
3785                 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
3786                 rec.v1.issued = cpu_to_le32(cap->issued);
3787                 rec.v1.size = cpu_to_le64(inode->i_size);
3788                 ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
3789                 ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
3790                 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
3791                 rec.v1.pathbase = cpu_to_le64(pathbase);
3792         }
3793
3794         if (list_empty(&ci->i_cap_snaps)) {
3795                 snap_follows = ci->i_head_snapc ? ci->i_head_snapc->seq : 0;
3796         } else {
3797                 struct ceph_cap_snap *capsnap =
3798                         list_first_entry(&ci->i_cap_snaps,
3799                                          struct ceph_cap_snap, ci_item);
3800                 snap_follows = capsnap->follows;
3801         }
3802         spin_unlock(&ci->i_ceph_lock);
3803
3804         if (recon_state->msg_version >= 2) {
3805                 int num_fcntl_locks, num_flock_locks;
3806                 struct ceph_filelock *flocks = NULL;
3807                 size_t struct_len, total_len = sizeof(u64);
3808                 u8 struct_v = 0;
3809
3810 encode_again:
3811                 if (rec.v2.flock_len) {
3812                         ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
3813                 } else {
3814                         num_fcntl_locks = 0;
3815                         num_flock_locks = 0;
3816                 }
3817                 if (num_fcntl_locks + num_flock_locks > 0) {
3818                         flocks = kmalloc_array(num_fcntl_locks + num_flock_locks,
3819                                                sizeof(struct ceph_filelock),
3820                                                GFP_NOFS);
3821                         if (!flocks) {
3822                                 err = -ENOMEM;
3823                                 goto out_err;
3824                         }
3825                         err = ceph_encode_locks_to_buffer(inode, flocks,
3826                                                           num_fcntl_locks,
3827                                                           num_flock_locks);
3828                         if (err) {
3829                                 kfree(flocks);
3830                                 flocks = NULL;
3831                                 if (err == -ENOSPC)
3832                                         goto encode_again;
3833                                 goto out_err;
3834                         }
3835                 } else {
3836                         kfree(flocks);
3837                         flocks = NULL;
3838                 }
3839
3840                 if (recon_state->msg_version >= 3) {
3841                         /* version, compat_version and struct_len */
3842                         total_len += 2 * sizeof(u8) + sizeof(u32);
3843                         struct_v = 2;
3844                 }
3845                 /*
3846                  * number of encoded locks is stable, so copy to pagelist
3847                  */
3848                 struct_len = 2 * sizeof(u32) +
3849                             (num_fcntl_locks + num_flock_locks) *
3850                             sizeof(struct ceph_filelock);
3851                 rec.v2.flock_len = cpu_to_le32(struct_len);
3852
3853                 struct_len += sizeof(u32) + pathlen + sizeof(rec.v2);
3854
3855                 if (struct_v >= 2)
3856                         struct_len += sizeof(u64); /* snap_follows */
3857
3858                 total_len += struct_len;
3859
3860                 if (pagelist->length + total_len > RECONNECT_MAX_SIZE) {
3861                         err = send_reconnect_partial(recon_state);
3862                         if (err)
3863                                 goto out_freeflocks;
3864                         pagelist = recon_state->pagelist;
3865                 }
3866
3867                 err = ceph_pagelist_reserve(pagelist, total_len);
3868                 if (err)
3869                         goto out_freeflocks;
3870
3871                 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3872                 if (recon_state->msg_version >= 3) {
3873                         ceph_pagelist_encode_8(pagelist, struct_v);
3874                         ceph_pagelist_encode_8(pagelist, 1);
3875                         ceph_pagelist_encode_32(pagelist, struct_len);
3876                 }
3877                 ceph_pagelist_encode_string(pagelist, path, pathlen);
3878                 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
3879                 ceph_locks_to_pagelist(flocks, pagelist,
3880                                        num_fcntl_locks, num_flock_locks);
3881                 if (struct_v >= 2)
3882                         ceph_pagelist_encode_64(pagelist, snap_follows);
3883 out_freeflocks:
3884                 kfree(flocks);
3885         } else {
3886                 err = ceph_pagelist_reserve(pagelist,
3887                                             sizeof(u64) + sizeof(u32) +
3888                                             pathlen + sizeof(rec.v1));
3889                 if (err)
3890                         goto out_err;
3891
3892                 ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
3893                 ceph_pagelist_encode_string(pagelist, path, pathlen);
3894                 ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
3895         }
3896
3897 out_err:
3898         ceph_mdsc_free_path(path, pathlen);
3899         if (!err)
3900                 recon_state->nr_caps++;
3901         return err;
3902 }
3903
3904 static int encode_snap_realms(struct ceph_mds_client *mdsc,
3905                               struct ceph_reconnect_state *recon_state)
3906 {
3907         struct rb_node *p;
3908         struct ceph_pagelist *pagelist = recon_state->pagelist;
3909         int err = 0;
3910
3911         if (recon_state->msg_version >= 4) {
3912                 err = ceph_pagelist_encode_32(pagelist, mdsc->num_snap_realms);
3913                 if (err < 0)
3914                         goto fail;
3915         }
3916
3917         /*
3918          * snaprealms.  we provide mds with the ino, seq (version), and
3919          * parent for all of our realms.  If the mds has any newer info,
3920          * it will tell us.
3921          */
3922         for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
3923                 struct ceph_snap_realm *realm =
3924                        rb_entry(p, struct ceph_snap_realm, node);
3925                 struct ceph_mds_snaprealm_reconnect sr_rec;
3926
3927                 if (recon_state->msg_version >= 4) {
3928                         size_t need = sizeof(u8) * 2 + sizeof(u32) +
3929                                       sizeof(sr_rec);
3930
3931                         if (pagelist->length + need > RECONNECT_MAX_SIZE) {
3932                                 err = send_reconnect_partial(recon_state);
3933                                 if (err)
3934                                         goto fail;
3935                                 pagelist = recon_state->pagelist;
3936                         }
3937
3938                         err = ceph_pagelist_reserve(pagelist, need);
3939                         if (err)
3940                                 goto fail;
3941
3942                         ceph_pagelist_encode_8(pagelist, 1);
3943                         ceph_pagelist_encode_8(pagelist, 1);
3944                         ceph_pagelist_encode_32(pagelist, sizeof(sr_rec));
3945                 }
3946
3947                 dout(" adding snap realm %llx seq %lld parent %llx\n",
3948                      realm->ino, realm->seq, realm->parent_ino);
3949                 sr_rec.ino = cpu_to_le64(realm->ino);
3950                 sr_rec.seq = cpu_to_le64(realm->seq);
3951                 sr_rec.parent = cpu_to_le64(realm->parent_ino);
3952
3953                 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
3954                 if (err)
3955                         goto fail;
3956
3957                 recon_state->nr_realms++;
3958         }
3959 fail:
3960         return err;
3961 }
3962
3963
3964 /*
3965  * If an MDS fails and recovers, clients need to reconnect in order to
3966  * reestablish shared state.  This includes all caps issued through
3967  * this session _and_ the snap_realm hierarchy.  Because it's not
3968  * clear which snap realms the mds cares about, we send everything we
3969  * know about.. that ensures we'll then get any new info the
3970  * recovering MDS might have.
3971  *
3972  * This is a relatively heavyweight operation, but it's rare.
3973  */
3974 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
3975                                struct ceph_mds_session *session)
3976 {
3977         struct ceph_msg *reply;
3978         int mds = session->s_mds;
3979         int err = -ENOMEM;
3980         struct ceph_reconnect_state recon_state = {
3981                 .session = session,
3982         };
3983         LIST_HEAD(dispose);
3984
3985         pr_info("mds%d reconnect start\n", mds);
3986
3987         recon_state.pagelist = ceph_pagelist_alloc(GFP_NOFS);
3988         if (!recon_state.pagelist)
3989                 goto fail_nopagelist;
3990
3991         reply = ceph_msg_new2(CEPH_MSG_CLIENT_RECONNECT, 0, 1, GFP_NOFS, false);
3992         if (!reply)
3993                 goto fail_nomsg;
3994
3995         xa_destroy(&session->s_delegated_inos);
3996
3997         mutex_lock(&session->s_mutex);
3998         session->s_state = CEPH_MDS_SESSION_RECONNECTING;
3999         session->s_seq = 0;
4000
4001         dout("session %p state %s\n", session,
4002              ceph_session_state_name(session->s_state));
4003
4004         spin_lock(&session->s_gen_ttl_lock);
4005         session->s_cap_gen++;
4006         spin_unlock(&session->s_gen_ttl_lock);
4007
4008         spin_lock(&session->s_cap_lock);
4009         /* don't know if session is readonly */
4010         session->s_readonly = 0;
4011         /*
4012          * notify __ceph_remove_cap() that we are composing cap reconnect.
4013          * If a cap get released before being added to the cap reconnect,
4014          * __ceph_remove_cap() should skip queuing cap release.
4015          */
4016         session->s_cap_reconnect = 1;
4017         /* drop old cap expires; we're about to reestablish that state */
4018         detach_cap_releases(session, &dispose);
4019         spin_unlock(&session->s_cap_lock);
4020         dispose_cap_releases(mdsc, &dispose);
4021
4022         /* trim unused caps to reduce MDS's cache rejoin time */
4023         if (mdsc->fsc->sb->s_root)
4024                 shrink_dcache_parent(mdsc->fsc->sb->s_root);
4025
4026         ceph_con_close(&session->s_con);
4027         ceph_con_open(&session->s_con,
4028                       CEPH_ENTITY_TYPE_MDS, mds,
4029                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
4030
4031         /* replay unsafe requests */
4032         replay_unsafe_requests(mdsc, session);
4033
4034         ceph_early_kick_flushing_caps(mdsc, session);
4035
4036         down_read(&mdsc->snap_rwsem);
4037
4038         /* placeholder for nr_caps */
4039         err = ceph_pagelist_encode_32(recon_state.pagelist, 0);
4040         if (err)
4041                 goto fail;
4042
4043         if (test_bit(CEPHFS_FEATURE_MULTI_RECONNECT, &session->s_features)) {
4044                 recon_state.msg_version = 3;
4045                 recon_state.allow_multi = true;
4046         } else if (session->s_con.peer_features & CEPH_FEATURE_MDSENC) {
4047                 recon_state.msg_version = 3;
4048         } else {
4049                 recon_state.msg_version = 2;
4050         }
4051         /* trsaverse this session's caps */
4052         err = ceph_iterate_session_caps(session, reconnect_caps_cb, &recon_state);
4053
4054         spin_lock(&session->s_cap_lock);
4055         session->s_cap_reconnect = 0;
4056         spin_unlock(&session->s_cap_lock);
4057
4058         if (err < 0)
4059                 goto fail;
4060
4061         /* check if all realms can be encoded into current message */
4062         if (mdsc->num_snap_realms) {
4063                 size_t total_len =
4064                         recon_state.pagelist->length +
4065                         mdsc->num_snap_realms *
4066                         sizeof(struct ceph_mds_snaprealm_reconnect);
4067                 if (recon_state.msg_version >= 4) {
4068                         /* number of realms */
4069                         total_len += sizeof(u32);
4070                         /* version, compat_version and struct_len */
4071                         total_len += mdsc->num_snap_realms *
4072                                      (2 * sizeof(u8) + sizeof(u32));
4073                 }
4074                 if (total_len > RECONNECT_MAX_SIZE) {
4075                         if (!recon_state.allow_multi) {
4076                                 err = -ENOSPC;
4077                                 goto fail;
4078                         }
4079                         if (recon_state.nr_caps) {
4080                                 err = send_reconnect_partial(&recon_state);
4081                                 if (err)
4082                                         goto fail;
4083                         }
4084                         recon_state.msg_version = 5;
4085                 }
4086         }
4087
4088         err = encode_snap_realms(mdsc, &recon_state);
4089         if (err < 0)
4090                 goto fail;
4091
4092         if (recon_state.msg_version >= 5) {
4093                 err = ceph_pagelist_encode_8(recon_state.pagelist, 0);
4094                 if (err < 0)
4095                         goto fail;
4096         }
4097
4098         if (recon_state.nr_caps || recon_state.nr_realms) {
4099                 struct page *page =
4100                         list_first_entry(&recon_state.pagelist->head,
4101                                         struct page, lru);
4102                 __le32 *addr = kmap_atomic(page);
4103                 if (recon_state.nr_caps) {
4104                         WARN_ON(recon_state.nr_realms != mdsc->num_snap_realms);
4105                         *addr = cpu_to_le32(recon_state.nr_caps);
4106                 } else if (recon_state.msg_version >= 4) {
4107                         *(addr + 1) = cpu_to_le32(recon_state.nr_realms);
4108                 }
4109                 kunmap_atomic(addr);
4110         }
4111
4112         reply->hdr.version = cpu_to_le16(recon_state.msg_version);
4113         if (recon_state.msg_version >= 4)
4114                 reply->hdr.compat_version = cpu_to_le16(4);
4115
4116         reply->hdr.data_len = cpu_to_le32(recon_state.pagelist->length);
4117         ceph_msg_data_add_pagelist(reply, recon_state.pagelist);
4118
4119         ceph_con_send(&session->s_con, reply);
4120
4121         mutex_unlock(&session->s_mutex);
4122
4123         mutex_lock(&mdsc->mutex);
4124         __wake_requests(mdsc, &session->s_waiting);
4125         mutex_unlock(&mdsc->mutex);
4126
4127         up_read(&mdsc->snap_rwsem);
4128         ceph_pagelist_release(recon_state.pagelist);
4129         return;
4130
4131 fail:
4132         ceph_msg_put(reply);
4133         up_read(&mdsc->snap_rwsem);
4134         mutex_unlock(&session->s_mutex);
4135 fail_nomsg:
4136         ceph_pagelist_release(recon_state.pagelist);
4137 fail_nopagelist:
4138         pr_err("error %d preparing reconnect for mds%d\n", err, mds);
4139         return;
4140 }
4141
4142
4143 /*
4144  * compare old and new mdsmaps, kicking requests
4145  * and closing out old connections as necessary
4146  *
4147  * called under mdsc->mutex.
4148  */
4149 static void check_new_map(struct ceph_mds_client *mdsc,
4150                           struct ceph_mdsmap *newmap,
4151                           struct ceph_mdsmap *oldmap)
4152 {
4153         int i;
4154         int oldstate, newstate;
4155         struct ceph_mds_session *s;
4156
4157         dout("check_new_map new %u old %u\n",
4158              newmap->m_epoch, oldmap->m_epoch);
4159
4160         for (i = 0; i < oldmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4161                 if (!mdsc->sessions[i])
4162                         continue;
4163                 s = mdsc->sessions[i];
4164                 oldstate = ceph_mdsmap_get_state(oldmap, i);
4165                 newstate = ceph_mdsmap_get_state(newmap, i);
4166
4167                 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
4168                      i, ceph_mds_state_name(oldstate),
4169                      ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
4170                      ceph_mds_state_name(newstate),
4171                      ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
4172                      ceph_session_state_name(s->s_state));
4173
4174                 if (i >= newmap->possible_max_rank) {
4175                         /* force close session for stopped mds */
4176                         ceph_get_mds_session(s);
4177                         __unregister_session(mdsc, s);
4178                         __wake_requests(mdsc, &s->s_waiting);
4179                         mutex_unlock(&mdsc->mutex);
4180
4181                         mutex_lock(&s->s_mutex);
4182                         cleanup_session_requests(mdsc, s);
4183                         remove_session_caps(s);
4184                         mutex_unlock(&s->s_mutex);
4185
4186                         ceph_put_mds_session(s);
4187
4188                         mutex_lock(&mdsc->mutex);
4189                         kick_requests(mdsc, i);
4190                         continue;
4191                 }
4192
4193                 if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
4194                            ceph_mdsmap_get_addr(newmap, i),
4195                            sizeof(struct ceph_entity_addr))) {
4196                         /* just close it */
4197                         mutex_unlock(&mdsc->mutex);
4198                         mutex_lock(&s->s_mutex);
4199                         mutex_lock(&mdsc->mutex);
4200                         ceph_con_close(&s->s_con);
4201                         mutex_unlock(&s->s_mutex);
4202                         s->s_state = CEPH_MDS_SESSION_RESTARTING;
4203                 } else if (oldstate == newstate) {
4204                         continue;  /* nothing new with this mds */
4205                 }
4206
4207                 /*
4208                  * send reconnect?
4209                  */
4210                 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
4211                     newstate >= CEPH_MDS_STATE_RECONNECT) {
4212                         mutex_unlock(&mdsc->mutex);
4213                         send_mds_reconnect(mdsc, s);
4214                         mutex_lock(&mdsc->mutex);
4215                 }
4216
4217                 /*
4218                  * kick request on any mds that has gone active.
4219                  */
4220                 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
4221                     newstate >= CEPH_MDS_STATE_ACTIVE) {
4222                         if (oldstate != CEPH_MDS_STATE_CREATING &&
4223                             oldstate != CEPH_MDS_STATE_STARTING)
4224                                 pr_info("mds%d recovery completed\n", s->s_mds);
4225                         kick_requests(mdsc, i);
4226                         mutex_unlock(&mdsc->mutex);
4227                         mutex_lock(&s->s_mutex);
4228                         mutex_lock(&mdsc->mutex);
4229                         ceph_kick_flushing_caps(mdsc, s);
4230                         mutex_unlock(&s->s_mutex);
4231                         wake_up_session_caps(s, RECONNECT);
4232                 }
4233         }
4234
4235         for (i = 0; i < newmap->possible_max_rank && i < mdsc->max_sessions; i++) {
4236                 s = mdsc->sessions[i];
4237                 if (!s)
4238                         continue;
4239                 if (!ceph_mdsmap_is_laggy(newmap, i))
4240                         continue;
4241                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4242                     s->s_state == CEPH_MDS_SESSION_HUNG ||
4243                     s->s_state == CEPH_MDS_SESSION_CLOSING) {
4244                         dout(" connecting to export targets of laggy mds%d\n",
4245                              i);
4246                         __open_export_target_sessions(mdsc, s);
4247                 }
4248         }
4249 }
4250
4251
4252
4253 /*
4254  * leases
4255  */
4256
4257 /*
4258  * caller must hold session s_mutex, dentry->d_lock
4259  */
4260 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
4261 {
4262         struct ceph_dentry_info *di = ceph_dentry(dentry);
4263
4264         ceph_put_mds_session(di->lease_session);
4265         di->lease_session = NULL;
4266 }
4267
4268 static void handle_lease(struct ceph_mds_client *mdsc,
4269                          struct ceph_mds_session *session,
4270                          struct ceph_msg *msg)
4271 {
4272         struct super_block *sb = mdsc->fsc->sb;
4273         struct inode *inode;
4274         struct dentry *parent, *dentry;
4275         struct ceph_dentry_info *di;
4276         int mds = session->s_mds;
4277         struct ceph_mds_lease *h = msg->front.iov_base;
4278         u32 seq;
4279         struct ceph_vino vino;
4280         struct qstr dname;
4281         int release = 0;
4282
4283         dout("handle_lease from mds%d\n", mds);
4284
4285         /* decode */
4286         if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
4287                 goto bad;
4288         vino.ino = le64_to_cpu(h->ino);
4289         vino.snap = CEPH_NOSNAP;
4290         seq = le32_to_cpu(h->seq);
4291         dname.len = get_unaligned_le32(h + 1);
4292         if (msg->front.iov_len < sizeof(*h) + sizeof(u32) + dname.len)
4293                 goto bad;
4294         dname.name = (void *)(h + 1) + sizeof(u32);
4295
4296         /* lookup inode */
4297         inode = ceph_find_inode(sb, vino);
4298         dout("handle_lease %s, ino %llx %p %.*s\n",
4299              ceph_lease_op_name(h->action), vino.ino, inode,
4300              dname.len, dname.name);
4301
4302         mutex_lock(&session->s_mutex);
4303         inc_session_sequence(session);
4304
4305         if (!inode) {
4306                 dout("handle_lease no inode %llx\n", vino.ino);
4307                 goto release;
4308         }
4309
4310         /* dentry */
4311         parent = d_find_alias(inode);
4312         if (!parent) {
4313                 dout("no parent dentry on inode %p\n", inode);
4314                 WARN_ON(1);
4315                 goto release;  /* hrm... */
4316         }
4317         dname.hash = full_name_hash(parent, dname.name, dname.len);
4318         dentry = d_lookup(parent, &dname);
4319         dput(parent);
4320         if (!dentry)
4321                 goto release;
4322
4323         spin_lock(&dentry->d_lock);
4324         di = ceph_dentry(dentry);
4325         switch (h->action) {
4326         case CEPH_MDS_LEASE_REVOKE:
4327                 if (di->lease_session == session) {
4328                         if (ceph_seq_cmp(di->lease_seq, seq) > 0)
4329                                 h->seq = cpu_to_le32(di->lease_seq);
4330                         __ceph_mdsc_drop_dentry_lease(dentry);
4331                 }
4332                 release = 1;
4333                 break;
4334
4335         case CEPH_MDS_LEASE_RENEW:
4336                 if (di->lease_session == session &&
4337                     di->lease_gen == session->s_cap_gen &&
4338                     di->lease_renew_from &&
4339                     di->lease_renew_after == 0) {
4340                         unsigned long duration =
4341                                 msecs_to_jiffies(le32_to_cpu(h->duration_ms));
4342
4343                         di->lease_seq = seq;
4344                         di->time = di->lease_renew_from + duration;
4345                         di->lease_renew_after = di->lease_renew_from +
4346                                 (duration >> 1);
4347                         di->lease_renew_from = 0;
4348                 }
4349                 break;
4350         }
4351         spin_unlock(&dentry->d_lock);
4352         dput(dentry);
4353
4354         if (!release)
4355                 goto out;
4356
4357 release:
4358         /* let's just reuse the same message */
4359         h->action = CEPH_MDS_LEASE_REVOKE_ACK;
4360         ceph_msg_get(msg);
4361         ceph_con_send(&session->s_con, msg);
4362
4363 out:
4364         mutex_unlock(&session->s_mutex);
4365         /* avoid calling iput_final() in mds dispatch threads */
4366         ceph_async_iput(inode);
4367         return;
4368
4369 bad:
4370         pr_err("corrupt lease message\n");
4371         ceph_msg_dump(msg);
4372 }
4373
4374 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
4375                               struct dentry *dentry, char action,
4376                               u32 seq)
4377 {
4378         struct ceph_msg *msg;
4379         struct ceph_mds_lease *lease;
4380         struct inode *dir;
4381         int len = sizeof(*lease) + sizeof(u32) + NAME_MAX;
4382
4383         dout("lease_send_msg identry %p %s to mds%d\n",
4384              dentry, ceph_lease_op_name(action), session->s_mds);
4385
4386         msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS, false);
4387         if (!msg)
4388                 return;
4389         lease = msg->front.iov_base;
4390         lease->action = action;
4391         lease->seq = cpu_to_le32(seq);
4392
4393         spin_lock(&dentry->d_lock);
4394         dir = d_inode(dentry->d_parent);
4395         lease->ino = cpu_to_le64(ceph_ino(dir));
4396         lease->first = lease->last = cpu_to_le64(ceph_snap(dir));
4397
4398         put_unaligned_le32(dentry->d_name.len, lease + 1);
4399         memcpy((void *)(lease + 1) + 4,
4400                dentry->d_name.name, dentry->d_name.len);
4401         spin_unlock(&dentry->d_lock);
4402         /*
4403          * if this is a preemptive lease RELEASE, no need to
4404          * flush request stream, since the actual request will
4405          * soon follow.
4406          */
4407         msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
4408
4409         ceph_con_send(&session->s_con, msg);
4410 }
4411
4412 /*
4413  * lock unlock the session, to wait ongoing session activities
4414  */
4415 static void lock_unlock_session(struct ceph_mds_session *s)
4416 {
4417         mutex_lock(&s->s_mutex);
4418         mutex_unlock(&s->s_mutex);
4419 }
4420
4421 static void maybe_recover_session(struct ceph_mds_client *mdsc)
4422 {
4423         struct ceph_fs_client *fsc = mdsc->fsc;
4424
4425         if (!ceph_test_mount_opt(fsc, CLEANRECOVER))
4426                 return;
4427
4428         if (READ_ONCE(fsc->mount_state) != CEPH_MOUNT_MOUNTED)
4429                 return;
4430
4431         if (!READ_ONCE(fsc->blocklisted))
4432                 return;
4433
4434         if (fsc->last_auto_reconnect &&
4435             time_before(jiffies, fsc->last_auto_reconnect + HZ * 60 * 30))
4436                 return;
4437
4438         pr_info("auto reconnect after blocklisted\n");
4439         fsc->last_auto_reconnect = jiffies;
4440         ceph_force_reconnect(fsc->sb);
4441 }
4442
4443 bool check_session_state(struct ceph_mds_session *s)
4444 {
4445         switch (s->s_state) {
4446         case CEPH_MDS_SESSION_OPEN:
4447                 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
4448                         s->s_state = CEPH_MDS_SESSION_HUNG;
4449                         pr_info("mds%d hung\n", s->s_mds);
4450                 }
4451                 break;
4452         case CEPH_MDS_SESSION_CLOSING:
4453                 /* Should never reach this when we're unmounting */
4454                 WARN_ON_ONCE(s->s_ttl);
4455                 fallthrough;
4456         case CEPH_MDS_SESSION_NEW:
4457         case CEPH_MDS_SESSION_RESTARTING:
4458         case CEPH_MDS_SESSION_CLOSED:
4459         case CEPH_MDS_SESSION_REJECTED:
4460                 return false;
4461         }
4462
4463         return true;
4464 }
4465
4466 /*
4467  * If the sequence is incremented while we're waiting on a REQUEST_CLOSE reply,
4468  * then we need to retransmit that request.
4469  */
4470 void inc_session_sequence(struct ceph_mds_session *s)
4471 {
4472         lockdep_assert_held(&s->s_mutex);
4473
4474         s->s_seq++;
4475
4476         if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
4477                 int ret;
4478
4479                 dout("resending session close request for mds%d\n", s->s_mds);
4480                 ret = request_close_session(s);
4481                 if (ret < 0)
4482                         pr_err("unable to close session to mds%d: %d\n",
4483                                s->s_mds, ret);
4484         }
4485 }
4486
4487 /*
4488  * delayed work -- periodically trim expired leases, renew caps with mds.  If
4489  * the @delay parameter is set to 0 or if it's more than 5 secs, the default
4490  * workqueue delay value of 5 secs will be used.
4491  */
4492 static void schedule_delayed(struct ceph_mds_client *mdsc, unsigned long delay)
4493 {
4494         unsigned long max_delay = HZ * 5;
4495
4496         /* 5 secs default delay */
4497         if (!delay || (delay > max_delay))
4498                 delay = max_delay;
4499         schedule_delayed_work(&mdsc->delayed_work,
4500                               round_jiffies_relative(delay));
4501 }
4502
4503 static void delayed_work(struct work_struct *work)
4504 {
4505         struct ceph_mds_client *mdsc =
4506                 container_of(work, struct ceph_mds_client, delayed_work.work);
4507         unsigned long delay;
4508         int renew_interval;
4509         int renew_caps;
4510         int i;
4511
4512         dout("mdsc delayed_work\n");
4513
4514         if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHED)
4515                 return;
4516
4517         mutex_lock(&mdsc->mutex);
4518         renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
4519         renew_caps = time_after_eq(jiffies, HZ*renew_interval +
4520                                    mdsc->last_renew_caps);
4521         if (renew_caps)
4522                 mdsc->last_renew_caps = jiffies;
4523
4524         for (i = 0; i < mdsc->max_sessions; i++) {
4525                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
4526                 if (!s)
4527                         continue;
4528
4529                 if (!check_session_state(s)) {
4530                         ceph_put_mds_session(s);
4531                         continue;
4532                 }
4533                 mutex_unlock(&mdsc->mutex);
4534
4535                 mutex_lock(&s->s_mutex);
4536                 if (renew_caps)
4537                         send_renew_caps(mdsc, s);
4538                 else
4539                         ceph_con_keepalive(&s->s_con);
4540                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
4541                     s->s_state == CEPH_MDS_SESSION_HUNG)
4542                         ceph_send_cap_releases(mdsc, s);
4543                 mutex_unlock(&s->s_mutex);
4544                 ceph_put_mds_session(s);
4545
4546                 mutex_lock(&mdsc->mutex);
4547         }
4548         mutex_unlock(&mdsc->mutex);
4549
4550         delay = ceph_check_delayed_caps(mdsc);
4551
4552         ceph_queue_cap_reclaim_work(mdsc);
4553
4554         ceph_trim_snapid_map(mdsc);
4555
4556         maybe_recover_session(mdsc);
4557
4558         schedule_delayed(mdsc, delay);
4559 }
4560
4561 int ceph_mdsc_init(struct ceph_fs_client *fsc)
4562
4563 {
4564         struct ceph_mds_client *mdsc;
4565         int err;
4566
4567         mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
4568         if (!mdsc)
4569                 return -ENOMEM;
4570         mdsc->fsc = fsc;
4571         mutex_init(&mdsc->mutex);
4572         mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
4573         if (!mdsc->mdsmap) {
4574                 err = -ENOMEM;
4575                 goto err_mdsc;
4576         }
4577
4578         init_completion(&mdsc->safe_umount_waiters);
4579         init_waitqueue_head(&mdsc->session_close_wq);
4580         INIT_LIST_HEAD(&mdsc->waiting_for_map);
4581         mdsc->sessions = NULL;
4582         atomic_set(&mdsc->num_sessions, 0);
4583         mdsc->max_sessions = 0;
4584         mdsc->stopping = 0;
4585         atomic64_set(&mdsc->quotarealms_count, 0);
4586         mdsc->quotarealms_inodes = RB_ROOT;
4587         mutex_init(&mdsc->quotarealms_inodes_mutex);
4588         mdsc->last_snap_seq = 0;
4589         init_rwsem(&mdsc->snap_rwsem);
4590         mdsc->snap_realms = RB_ROOT;
4591         INIT_LIST_HEAD(&mdsc->snap_empty);
4592         mdsc->num_snap_realms = 0;
4593         spin_lock_init(&mdsc->snap_empty_lock);
4594         mdsc->last_tid = 0;
4595         mdsc->oldest_tid = 0;
4596         mdsc->request_tree = RB_ROOT;
4597         INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
4598         mdsc->last_renew_caps = jiffies;
4599         INIT_LIST_HEAD(&mdsc->cap_delay_list);
4600         INIT_LIST_HEAD(&mdsc->cap_wait_list);
4601         spin_lock_init(&mdsc->cap_delay_lock);
4602         INIT_LIST_HEAD(&mdsc->snap_flush_list);
4603         spin_lock_init(&mdsc->snap_flush_lock);
4604         mdsc->last_cap_flush_tid = 1;
4605         INIT_LIST_HEAD(&mdsc->cap_flush_list);
4606         INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
4607         mdsc->num_cap_flushing = 0;
4608         spin_lock_init(&mdsc->cap_dirty_lock);
4609         init_waitqueue_head(&mdsc->cap_flushing_wq);
4610         INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
4611         atomic_set(&mdsc->cap_reclaim_pending, 0);
4612         err = ceph_metric_init(&mdsc->metric);
4613         if (err)
4614                 goto err_mdsmap;
4615
4616         spin_lock_init(&mdsc->dentry_list_lock);
4617         INIT_LIST_HEAD(&mdsc->dentry_leases);
4618         INIT_LIST_HEAD(&mdsc->dentry_dir_leases);
4619
4620         ceph_caps_init(mdsc);
4621         ceph_adjust_caps_max_min(mdsc, fsc->mount_options);
4622
4623         spin_lock_init(&mdsc->snapid_map_lock);
4624         mdsc->snapid_map_tree = RB_ROOT;
4625         INIT_LIST_HEAD(&mdsc->snapid_map_lru);
4626
4627         init_rwsem(&mdsc->pool_perm_rwsem);
4628         mdsc->pool_perm_tree = RB_ROOT;
4629
4630         strscpy(mdsc->nodename, utsname()->nodename,
4631                 sizeof(mdsc->nodename));
4632
4633         fsc->mdsc = mdsc;
4634         return 0;
4635
4636 err_mdsmap:
4637         kfree(mdsc->mdsmap);
4638 err_mdsc:
4639         kfree(mdsc);
4640         return err;
4641 }
4642
4643 /*
4644  * Wait for safe replies on open mds requests.  If we time out, drop
4645  * all requests from the tree to avoid dangling dentry refs.
4646  */
4647 static void wait_requests(struct ceph_mds_client *mdsc)
4648 {
4649         struct ceph_options *opts = mdsc->fsc->client->options;
4650         struct ceph_mds_request *req;
4651
4652         mutex_lock(&mdsc->mutex);
4653         if (__get_oldest_req(mdsc)) {
4654                 mutex_unlock(&mdsc->mutex);
4655
4656                 dout("wait_requests waiting for requests\n");
4657                 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
4658                                     ceph_timeout_jiffies(opts->mount_timeout));
4659
4660                 /* tear down remaining requests */
4661                 mutex_lock(&mdsc->mutex);
4662                 while ((req = __get_oldest_req(mdsc))) {
4663                         dout("wait_requests timed out on tid %llu\n",
4664                              req->r_tid);
4665                         list_del_init(&req->r_wait);
4666                         __unregister_request(mdsc, req);
4667                 }
4668         }
4669         mutex_unlock(&mdsc->mutex);
4670         dout("wait_requests done\n");
4671 }
4672
4673 void send_flush_mdlog(struct ceph_mds_session *s)
4674 {
4675         struct ceph_msg *msg;
4676
4677         /*
4678          * Pre-luminous MDS crashes when it sees an unknown session request
4679          */
4680         if (!CEPH_HAVE_FEATURE(s->s_con.peer_features, SERVER_LUMINOUS))
4681                 return;
4682
4683         mutex_lock(&s->s_mutex);
4684         dout("request mdlog flush to mds%d (%s)s seq %lld\n", s->s_mds,
4685              ceph_session_state_name(s->s_state), s->s_seq);
4686         msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_FLUSH_MDLOG,
4687                                       s->s_seq);
4688         if (!msg) {
4689                 pr_err("failed to request mdlog flush to mds%d (%s) seq %lld\n",
4690                        s->s_mds, ceph_session_state_name(s->s_state), s->s_seq);
4691         } else {
4692                 ceph_con_send(&s->s_con, msg);
4693         }
4694         mutex_unlock(&s->s_mutex);
4695 }
4696
4697 /*
4698  * called before mount is ro, and before dentries are torn down.
4699  * (hmm, does this still race with new lookups?)
4700  */
4701 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
4702 {
4703         dout("pre_umount\n");
4704         mdsc->stopping = CEPH_MDSC_STOPPING_BEGIN;
4705
4706         ceph_mdsc_iterate_sessions(mdsc, send_flush_mdlog, true);
4707         ceph_mdsc_iterate_sessions(mdsc, lock_unlock_session, false);
4708         ceph_flush_dirty_caps(mdsc);
4709         wait_requests(mdsc);
4710
4711         /*
4712          * wait for reply handlers to drop their request refs and
4713          * their inode/dcache refs
4714          */
4715         ceph_msgr_flush();
4716
4717         ceph_cleanup_quotarealms_inodes(mdsc);
4718 }
4719
4720 /*
4721  * wait for all write mds requests to flush.
4722  */
4723 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
4724 {
4725         struct ceph_mds_request *req = NULL, *nextreq;
4726         struct rb_node *n;
4727
4728         mutex_lock(&mdsc->mutex);
4729         dout("wait_unsafe_requests want %lld\n", want_tid);
4730 restart:
4731         req = __get_oldest_req(mdsc);
4732         while (req && req->r_tid <= want_tid) {
4733                 /* find next request */
4734                 n = rb_next(&req->r_node);
4735                 if (n)
4736                         nextreq = rb_entry(n, struct ceph_mds_request, r_node);
4737                 else
4738                         nextreq = NULL;
4739                 if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
4740                     (req->r_op & CEPH_MDS_OP_WRITE)) {
4741                         /* write op */
4742                         ceph_mdsc_get_request(req);
4743                         if (nextreq)
4744                                 ceph_mdsc_get_request(nextreq);
4745                         mutex_unlock(&mdsc->mutex);
4746                         dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
4747                              req->r_tid, want_tid);
4748                         wait_for_completion(&req->r_safe_completion);
4749                         mutex_lock(&mdsc->mutex);
4750                         ceph_mdsc_put_request(req);
4751                         if (!nextreq)
4752                                 break;  /* next dne before, so we're done! */
4753                         if (RB_EMPTY_NODE(&nextreq->r_node)) {
4754                                 /* next request was removed from tree */
4755                                 ceph_mdsc_put_request(nextreq);
4756                                 goto restart;
4757                         }
4758                         ceph_mdsc_put_request(nextreq);  /* won't go away */
4759                 }
4760                 req = nextreq;
4761         }
4762         mutex_unlock(&mdsc->mutex);
4763         dout("wait_unsafe_requests done\n");
4764 }
4765
4766 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
4767 {
4768         u64 want_tid, want_flush;
4769
4770         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4771                 return;
4772
4773         dout("sync\n");
4774         mutex_lock(&mdsc->mutex);
4775         want_tid = mdsc->last_tid;
4776         mutex_unlock(&mdsc->mutex);
4777
4778         ceph_flush_dirty_caps(mdsc);
4779         spin_lock(&mdsc->cap_dirty_lock);
4780         want_flush = mdsc->last_cap_flush_tid;
4781         if (!list_empty(&mdsc->cap_flush_list)) {
4782                 struct ceph_cap_flush *cf =
4783                         list_last_entry(&mdsc->cap_flush_list,
4784                                         struct ceph_cap_flush, g_list);
4785                 cf->wake = true;
4786         }
4787         spin_unlock(&mdsc->cap_dirty_lock);
4788
4789         dout("sync want tid %lld flush_seq %lld\n",
4790              want_tid, want_flush);
4791
4792         wait_unsafe_requests(mdsc, want_tid);
4793         wait_caps_flush(mdsc, want_flush);
4794 }
4795
4796 /*
4797  * true if all sessions are closed, or we force unmount
4798  */
4799 static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped)
4800 {
4801         if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
4802                 return true;
4803         return atomic_read(&mdsc->num_sessions) <= skipped;
4804 }
4805
4806 /*
4807  * called after sb is ro.
4808  */
4809 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
4810 {
4811         struct ceph_options *opts = mdsc->fsc->client->options;
4812         struct ceph_mds_session *session;
4813         int i;
4814         int skipped = 0;
4815
4816         dout("close_sessions\n");
4817
4818         /* close sessions */
4819         mutex_lock(&mdsc->mutex);
4820         for (i = 0; i < mdsc->max_sessions; i++) {
4821                 session = __ceph_lookup_mds_session(mdsc, i);
4822                 if (!session)
4823                         continue;
4824                 mutex_unlock(&mdsc->mutex);
4825                 mutex_lock(&session->s_mutex);
4826                 if (__close_session(mdsc, session) <= 0)
4827                         skipped++;
4828                 mutex_unlock(&session->s_mutex);
4829                 ceph_put_mds_session(session);
4830                 mutex_lock(&mdsc->mutex);
4831         }
4832         mutex_unlock(&mdsc->mutex);
4833
4834         dout("waiting for sessions to close\n");
4835         wait_event_timeout(mdsc->session_close_wq,
4836                            done_closing_sessions(mdsc, skipped),
4837                            ceph_timeout_jiffies(opts->mount_timeout));
4838
4839         /* tear down remaining sessions */
4840         mutex_lock(&mdsc->mutex);
4841         for (i = 0; i < mdsc->max_sessions; i++) {
4842                 if (mdsc->sessions[i]) {
4843                         session = ceph_get_mds_session(mdsc->sessions[i]);
4844                         __unregister_session(mdsc, session);
4845                         mutex_unlock(&mdsc->mutex);
4846                         mutex_lock(&session->s_mutex);
4847                         remove_session_caps(session);
4848                         mutex_unlock(&session->s_mutex);
4849                         ceph_put_mds_session(session);
4850                         mutex_lock(&mdsc->mutex);
4851                 }
4852         }
4853         WARN_ON(!list_empty(&mdsc->cap_delay_list));
4854         mutex_unlock(&mdsc->mutex);
4855
4856         ceph_cleanup_snapid_map(mdsc);
4857         ceph_cleanup_empty_realms(mdsc);
4858
4859         cancel_work_sync(&mdsc->cap_reclaim_work);
4860         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
4861
4862         dout("stopped\n");
4863 }
4864
4865 void ceph_mdsc_force_umount(struct ceph_mds_client *mdsc)
4866 {
4867         struct ceph_mds_session *session;
4868         int mds;
4869
4870         dout("force umount\n");
4871
4872         mutex_lock(&mdsc->mutex);
4873         for (mds = 0; mds < mdsc->max_sessions; mds++) {
4874                 session = __ceph_lookup_mds_session(mdsc, mds);
4875                 if (!session)
4876                         continue;
4877
4878                 if (session->s_state == CEPH_MDS_SESSION_REJECTED)
4879                         __unregister_session(mdsc, session);
4880                 __wake_requests(mdsc, &session->s_waiting);
4881                 mutex_unlock(&mdsc->mutex);
4882
4883                 mutex_lock(&session->s_mutex);
4884                 __close_session(mdsc, session);
4885                 if (session->s_state == CEPH_MDS_SESSION_CLOSING) {
4886                         cleanup_session_requests(mdsc, session);
4887                         remove_session_caps(session);
4888                 }
4889                 mutex_unlock(&session->s_mutex);
4890                 ceph_put_mds_session(session);
4891
4892                 mutex_lock(&mdsc->mutex);
4893                 kick_requests(mdsc, mds);
4894         }
4895         __wake_requests(mdsc, &mdsc->waiting_for_map);
4896         mutex_unlock(&mdsc->mutex);
4897 }
4898
4899 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
4900 {
4901         dout("stop\n");
4902         /*
4903          * Make sure the delayed work stopped before releasing
4904          * the resources.
4905          *
4906          * Because the cancel_delayed_work_sync() will only
4907          * guarantee that the work finishes executing. But the
4908          * delayed work will re-arm itself again after that.
4909          */
4910         flush_delayed_work(&mdsc->delayed_work);
4911
4912         if (mdsc->mdsmap)
4913                 ceph_mdsmap_destroy(mdsc->mdsmap);
4914         kfree(mdsc->sessions);
4915         ceph_caps_finalize(mdsc);
4916         ceph_pool_perm_destroy(mdsc);
4917 }
4918
4919 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
4920 {
4921         struct ceph_mds_client *mdsc = fsc->mdsc;
4922         dout("mdsc_destroy %p\n", mdsc);
4923
4924         if (!mdsc)
4925                 return;
4926
4927         /* flush out any connection work with references to us */
4928         ceph_msgr_flush();
4929
4930         ceph_mdsc_stop(mdsc);
4931
4932         ceph_metric_destroy(&mdsc->metric);
4933
4934         fsc->mdsc = NULL;
4935         kfree(mdsc);
4936         dout("mdsc_destroy %p done\n", mdsc);
4937 }
4938
4939 void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
4940 {
4941         struct ceph_fs_client *fsc = mdsc->fsc;
4942         const char *mds_namespace = fsc->mount_options->mds_namespace;
4943         void *p = msg->front.iov_base;
4944         void *end = p + msg->front.iov_len;
4945         u32 epoch;
4946         u32 map_len;
4947         u32 num_fs;
4948         u32 mount_fscid = (u32)-1;
4949         u8 struct_v, struct_cv;
4950         int err = -EINVAL;
4951
4952         ceph_decode_need(&p, end, sizeof(u32), bad);
4953         epoch = ceph_decode_32(&p);
4954
4955         dout("handle_fsmap epoch %u\n", epoch);
4956
4957         ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4958         struct_v = ceph_decode_8(&p);
4959         struct_cv = ceph_decode_8(&p);
4960         map_len = ceph_decode_32(&p);
4961
4962         ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
4963         p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */
4964
4965         num_fs = ceph_decode_32(&p);
4966         while (num_fs-- > 0) {
4967                 void *info_p, *info_end;
4968                 u32 info_len;
4969                 u8 info_v, info_cv;
4970                 u32 fscid, namelen;
4971
4972                 ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
4973                 info_v = ceph_decode_8(&p);
4974                 info_cv = ceph_decode_8(&p);
4975                 info_len = ceph_decode_32(&p);
4976                 ceph_decode_need(&p, end, info_len, bad);
4977                 info_p = p;
4978                 info_end = p + info_len;
4979                 p = info_end;
4980
4981                 ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
4982                 fscid = ceph_decode_32(&info_p);
4983                 namelen = ceph_decode_32(&info_p);
4984                 ceph_decode_need(&info_p, info_end, namelen, bad);
4985
4986                 if (mds_namespace &&
4987                     strlen(mds_namespace) == namelen &&
4988                     !strncmp(mds_namespace, (char *)info_p, namelen)) {
4989                         mount_fscid = fscid;
4990                         break;
4991                 }
4992         }
4993
4994         ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
4995         if (mount_fscid != (u32)-1) {
4996                 fsc->client->monc.fs_cluster_id = mount_fscid;
4997                 ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
4998                                    0, true);
4999                 ceph_monc_renew_subs(&fsc->client->monc);
5000         } else {
5001                 err = -ENOENT;
5002                 goto err_out;
5003         }
5004         return;
5005
5006 bad:
5007         pr_err("error decoding fsmap\n");
5008 err_out:
5009         mutex_lock(&mdsc->mutex);
5010         mdsc->mdsmap_err = err;
5011         __wake_requests(mdsc, &mdsc->waiting_for_map);
5012         mutex_unlock(&mdsc->mutex);
5013 }
5014
5015 /*
5016  * handle mds map update.
5017  */
5018 void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
5019 {
5020         u32 epoch;
5021         u32 maplen;
5022         void *p = msg->front.iov_base;
5023         void *end = p + msg->front.iov_len;
5024         struct ceph_mdsmap *newmap, *oldmap;
5025         struct ceph_fsid fsid;
5026         int err = -EINVAL;
5027
5028         ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
5029         ceph_decode_copy(&p, &fsid, sizeof(fsid));
5030         if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
5031                 return;
5032         epoch = ceph_decode_32(&p);
5033         maplen = ceph_decode_32(&p);
5034         dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
5035
5036         /* do we need it? */
5037         mutex_lock(&mdsc->mutex);
5038         if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
5039                 dout("handle_map epoch %u <= our %u\n",
5040                      epoch, mdsc->mdsmap->m_epoch);
5041                 mutex_unlock(&mdsc->mutex);
5042                 return;
5043         }
5044
5045         newmap = ceph_mdsmap_decode(&p, end);
5046         if (IS_ERR(newmap)) {
5047                 err = PTR_ERR(newmap);
5048                 goto bad_unlock;
5049         }
5050
5051         /* swap into place */
5052         if (mdsc->mdsmap) {
5053                 oldmap = mdsc->mdsmap;
5054                 mdsc->mdsmap = newmap;
5055                 check_new_map(mdsc, newmap, oldmap);
5056                 ceph_mdsmap_destroy(oldmap);
5057         } else {
5058                 mdsc->mdsmap = newmap;  /* first mds map */
5059         }
5060         mdsc->fsc->max_file_size = min((loff_t)mdsc->mdsmap->m_max_file_size,
5061                                         MAX_LFS_FILESIZE);
5062
5063         __wake_requests(mdsc, &mdsc->waiting_for_map);
5064         ceph_monc_got_map(&mdsc->fsc->client->monc, CEPH_SUB_MDSMAP,
5065                           mdsc->mdsmap->m_epoch);
5066
5067         mutex_unlock(&mdsc->mutex);
5068         schedule_delayed(mdsc, 0);
5069         return;
5070
5071 bad_unlock:
5072         mutex_unlock(&mdsc->mutex);
5073 bad:
5074         pr_err("error decoding mdsmap %d\n", err);
5075         return;
5076 }
5077
5078 static struct ceph_connection *con_get(struct ceph_connection *con)
5079 {
5080         struct ceph_mds_session *s = con->private;
5081
5082         if (ceph_get_mds_session(s))
5083                 return con;
5084         return NULL;
5085 }
5086
5087 static void con_put(struct ceph_connection *con)
5088 {
5089         struct ceph_mds_session *s = con->private;
5090
5091         ceph_put_mds_session(s);
5092 }
5093
5094 /*
5095  * if the client is unresponsive for long enough, the mds will kill
5096  * the session entirely.
5097  */
5098 static void peer_reset(struct ceph_connection *con)
5099 {
5100         struct ceph_mds_session *s = con->private;
5101         struct ceph_mds_client *mdsc = s->s_mdsc;
5102
5103         pr_warn("mds%d closed our session\n", s->s_mds);
5104         send_mds_reconnect(mdsc, s);
5105 }
5106
5107 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
5108 {
5109         struct ceph_mds_session *s = con->private;
5110         struct ceph_mds_client *mdsc = s->s_mdsc;
5111         int type = le16_to_cpu(msg->hdr.type);
5112
5113         mutex_lock(&mdsc->mutex);
5114         if (__verify_registered_session(mdsc, s) < 0) {
5115                 mutex_unlock(&mdsc->mutex);
5116                 goto out;
5117         }
5118         mutex_unlock(&mdsc->mutex);
5119
5120         switch (type) {
5121         case CEPH_MSG_MDS_MAP:
5122                 ceph_mdsc_handle_mdsmap(mdsc, msg);
5123                 break;
5124         case CEPH_MSG_FS_MAP_USER:
5125                 ceph_mdsc_handle_fsmap(mdsc, msg);
5126                 break;
5127         case CEPH_MSG_CLIENT_SESSION:
5128                 handle_session(s, msg);
5129                 break;
5130         case CEPH_MSG_CLIENT_REPLY:
5131                 handle_reply(s, msg);
5132                 break;
5133         case CEPH_MSG_CLIENT_REQUEST_FORWARD:
5134                 handle_forward(mdsc, s, msg);
5135                 break;
5136         case CEPH_MSG_CLIENT_CAPS:
5137                 ceph_handle_caps(s, msg);
5138                 break;
5139         case CEPH_MSG_CLIENT_SNAP:
5140                 ceph_handle_snap(mdsc, s, msg);
5141                 break;
5142         case CEPH_MSG_CLIENT_LEASE:
5143                 handle_lease(mdsc, s, msg);
5144                 break;
5145         case CEPH_MSG_CLIENT_QUOTA:
5146                 ceph_handle_quota(mdsc, s, msg);
5147                 break;
5148
5149         default:
5150                 pr_err("received unknown message type %d %s\n", type,
5151                        ceph_msg_type_name(type));
5152         }
5153 out:
5154         ceph_msg_put(msg);
5155 }
5156
5157 /*
5158  * authentication
5159  */
5160
5161 /*
5162  * Note: returned pointer is the address of a structure that's
5163  * managed separately.  Caller must *not* attempt to free it.
5164  */
5165 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
5166                                         int *proto, int force_new)
5167 {
5168         struct ceph_mds_session *s = con->private;
5169         struct ceph_mds_client *mdsc = s->s_mdsc;
5170         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5171         struct ceph_auth_handshake *auth = &s->s_auth;
5172
5173         if (force_new && auth->authorizer) {
5174                 ceph_auth_destroy_authorizer(auth->authorizer);
5175                 auth->authorizer = NULL;
5176         }
5177         if (!auth->authorizer) {
5178                 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
5179                                                       auth);
5180                 if (ret)
5181                         return ERR_PTR(ret);
5182         } else {
5183                 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_MDS,
5184                                                       auth);
5185                 if (ret)
5186                         return ERR_PTR(ret);
5187         }
5188         *proto = ac->protocol;
5189
5190         return auth;
5191 }
5192
5193 static int add_authorizer_challenge(struct ceph_connection *con,
5194                                     void *challenge_buf, int challenge_buf_len)
5195 {
5196         struct ceph_mds_session *s = con->private;
5197         struct ceph_mds_client *mdsc = s->s_mdsc;
5198         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5199
5200         return ceph_auth_add_authorizer_challenge(ac, s->s_auth.authorizer,
5201                                             challenge_buf, challenge_buf_len);
5202 }
5203
5204 static int verify_authorizer_reply(struct ceph_connection *con)
5205 {
5206         struct ceph_mds_session *s = con->private;
5207         struct ceph_mds_client *mdsc = s->s_mdsc;
5208         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5209
5210         return ceph_auth_verify_authorizer_reply(ac, s->s_auth.authorizer);
5211 }
5212
5213 static int invalidate_authorizer(struct ceph_connection *con)
5214 {
5215         struct ceph_mds_session *s = con->private;
5216         struct ceph_mds_client *mdsc = s->s_mdsc;
5217         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
5218
5219         ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
5220
5221         return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
5222 }
5223
5224 static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con,
5225                                 struct ceph_msg_header *hdr, int *skip)
5226 {
5227         struct ceph_msg *msg;
5228         int type = (int) le16_to_cpu(hdr->type);
5229         int front_len = (int) le32_to_cpu(hdr->front_len);
5230
5231         if (con->in_msg)
5232                 return con->in_msg;
5233
5234         *skip = 0;
5235         msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
5236         if (!msg) {
5237                 pr_err("unable to allocate msg type %d len %d\n",
5238                        type, front_len);
5239                 return NULL;
5240         }
5241
5242         return msg;
5243 }
5244
5245 static int mds_sign_message(struct ceph_msg *msg)
5246 {
5247        struct ceph_mds_session *s = msg->con->private;
5248        struct ceph_auth_handshake *auth = &s->s_auth;
5249
5250        return ceph_auth_sign_message(auth, msg);
5251 }
5252
5253 static int mds_check_message_signature(struct ceph_msg *msg)
5254 {
5255        struct ceph_mds_session *s = msg->con->private;
5256        struct ceph_auth_handshake *auth = &s->s_auth;
5257
5258        return ceph_auth_check_message_signature(auth, msg);
5259 }
5260
5261 static const struct ceph_connection_operations mds_con_ops = {
5262         .get = con_get,
5263         .put = con_put,
5264         .dispatch = dispatch,
5265         .get_authorizer = get_authorizer,
5266         .add_authorizer_challenge = add_authorizer_challenge,
5267         .verify_authorizer_reply = verify_authorizer_reply,
5268         .invalidate_authorizer = invalidate_authorizer,
5269         .peer_reset = peer_reset,
5270         .alloc_msg = mds_alloc_msg,
5271         .sign_message = mds_sign_message,
5272         .check_message_signature = mds_check_message_signature,
5273 };
5274
5275 /* eof */