GNU Linux-libre 5.4.257-gnu1
[releases.git] / fs / io_uring.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Shared application/kernel submission and completion ring pairs, for
4  * supporting fast/efficient IO.
5  *
6  * A note on the read/write ordering memory barriers that are matched between
7  * the application and kernel side.
8  *
9  * After the application reads the CQ ring tail, it must use an
10  * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11  * before writing the tail (using smp_load_acquire to read the tail will
12  * do). It also needs a smp_mb() before updating CQ head (ordering the
13  * entry load(s) with the head store), pairing with an implicit barrier
14  * through a control-dependency in io_get_cqring (smp_store_release to
15  * store head will do). Failure to do so could lead to reading invalid
16  * CQ entries.
17  *
18  * Likewise, the application must use an appropriate smp_wmb() before
19  * writing the SQ tail (ordering SQ entry stores with the tail store),
20  * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21  * to store the tail will do). And it needs a barrier ordering the SQ
22  * head load before writing new SQ entries (smp_load_acquire to read
23  * head will do).
24  *
25  * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26  * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27  * updating the SQ tail; a full memory barrier smp_mb() is needed
28  * between.
29  *
30  * Also see the examples in the liburing library:
31  *
32  *      git://git.kernel.dk/liburing
33  *
34  * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35  * from data shared between the kernel and application. This is done both
36  * for ordering purposes, but also to ensure that once a value is loaded from
37  * data that the application could potentially modify, it remains stable.
38  *
39  * Copyright (C) 2018-2019 Jens Axboe
40  * Copyright (c) 2018-2019 Christoph Hellwig
41  */
42 #include <linux/kernel.h>
43 #include <linux/init.h>
44 #include <linux/errno.h>
45 #include <linux/syscalls.h>
46 #include <linux/compat.h>
47 #include <linux/refcount.h>
48 #include <linux/uio.h>
49
50 #include <linux/sched/signal.h>
51 #include <linux/fs.h>
52 #include <linux/file.h>
53 #include <linux/fdtable.h>
54 #include <linux/mm.h>
55 #include <linux/mman.h>
56 #include <linux/mmu_context.h>
57 #include <linux/percpu.h>
58 #include <linux/slab.h>
59 #include <linux/workqueue.h>
60 #include <linux/kthread.h>
61 #include <linux/blkdev.h>
62 #include <linux/bvec.h>
63 #include <linux/net.h>
64 #include <net/sock.h>
65 #include <net/af_unix.h>
66 #include <net/scm.h>
67 #include <linux/anon_inodes.h>
68 #include <linux/sched/mm.h>
69 #include <linux/uaccess.h>
70 #include <linux/nospec.h>
71 #include <linux/sizes.h>
72 #include <linux/hugetlb.h>
73 #include <linux/highmem.h>
74 #include <linux/fs_struct.h>
75
76 #include <uapi/linux/io_uring.h>
77
78 #include "internal.h"
79
80 #define IORING_MAX_ENTRIES      32768
81 #define IORING_MAX_FIXED_FILES  1024
82
83 struct io_uring {
84         u32 head ____cacheline_aligned_in_smp;
85         u32 tail ____cacheline_aligned_in_smp;
86 };
87
88 /*
89  * This data is shared with the application through the mmap at offsets
90  * IORING_OFF_SQ_RING and IORING_OFF_CQ_RING.
91  *
92  * The offsets to the member fields are published through struct
93  * io_sqring_offsets when calling io_uring_setup.
94  */
95 struct io_rings {
96         /*
97          * Head and tail offsets into the ring; the offsets need to be
98          * masked to get valid indices.
99          *
100          * The kernel controls head of the sq ring and the tail of the cq ring,
101          * and the application controls tail of the sq ring and the head of the
102          * cq ring.
103          */
104         struct io_uring         sq, cq;
105         /*
106          * Bitmasks to apply to head and tail offsets (constant, equals
107          * ring_entries - 1)
108          */
109         u32                     sq_ring_mask, cq_ring_mask;
110         /* Ring sizes (constant, power of 2) */
111         u32                     sq_ring_entries, cq_ring_entries;
112         /*
113          * Number of invalid entries dropped by the kernel due to
114          * invalid index stored in array
115          *
116          * Written by the kernel, shouldn't be modified by the
117          * application (i.e. get number of "new events" by comparing to
118          * cached value).
119          *
120          * After a new SQ head value was read by the application this
121          * counter includes all submissions that were dropped reaching
122          * the new SQ head (and possibly more).
123          */
124         u32                     sq_dropped;
125         /*
126          * Runtime flags
127          *
128          * Written by the kernel, shouldn't be modified by the
129          * application.
130          *
131          * The application needs a full memory barrier before checking
132          * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
133          */
134         u32                     sq_flags;
135         /*
136          * Number of completion events lost because the queue was full;
137          * this should be avoided by the application by making sure
138          * there are not more requests pending thatn there is space in
139          * the completion queue.
140          *
141          * Written by the kernel, shouldn't be modified by the
142          * application (i.e. get number of "new events" by comparing to
143          * cached value).
144          *
145          * As completion events come in out of order this counter is not
146          * ordered with any other data.
147          */
148         u32                     cq_overflow;
149         /*
150          * Ring buffer of completion events.
151          *
152          * The kernel writes completion events fresh every time they are
153          * produced, so the application is allowed to modify pending
154          * entries.
155          */
156         struct io_uring_cqe     cqes[] ____cacheline_aligned_in_smp;
157 };
158
159 struct io_mapped_ubuf {
160         u64             ubuf;
161         size_t          len;
162         struct          bio_vec *bvec;
163         unsigned int    nr_bvecs;
164 };
165
166 struct async_list {
167         spinlock_t              lock;
168         atomic_t                cnt;
169         struct list_head        list;
170
171         struct file             *file;
172         off_t                   io_start;
173         size_t                  io_len;
174 };
175
176 struct io_ring_ctx {
177         struct {
178                 struct percpu_ref       refs;
179         } ____cacheline_aligned_in_smp;
180
181         struct {
182                 unsigned int            flags;
183                 bool                    compat;
184                 bool                    account_mem;
185
186                 /*
187                  * Ring buffer of indices into array of io_uring_sqe, which is
188                  * mmapped by the application using the IORING_OFF_SQES offset.
189                  *
190                  * This indirection could e.g. be used to assign fixed
191                  * io_uring_sqe entries to operations and only submit them to
192                  * the queue when needed.
193                  *
194                  * The kernel modifies neither the indices array nor the entries
195                  * array.
196                  */
197                 u32                     *sq_array;
198                 unsigned                cached_sq_head;
199                 unsigned                sq_entries;
200                 unsigned                sq_mask;
201                 unsigned                sq_thread_idle;
202                 unsigned                cached_sq_dropped;
203                 struct io_uring_sqe     *sq_sqes;
204
205                 struct list_head        defer_list;
206                 struct list_head        timeout_list;
207         } ____cacheline_aligned_in_smp;
208
209         /* IO offload */
210         struct workqueue_struct *sqo_wq[2];
211         struct task_struct      *sqo_thread;    /* if using sq thread polling */
212         struct mm_struct        *sqo_mm;
213         wait_queue_head_t       sqo_wait;
214         struct completion       sqo_thread_started;
215
216         struct {
217                 unsigned                cached_cq_tail;
218                 atomic_t                cached_cq_overflow;
219                 unsigned                cq_entries;
220                 unsigned                cq_mask;
221                 struct wait_queue_head  cq_wait;
222                 struct fasync_struct    *cq_fasync;
223                 struct eventfd_ctx      *cq_ev_fd;
224                 atomic_t                cq_timeouts;
225         } ____cacheline_aligned_in_smp;
226
227         struct io_rings *rings;
228
229         /*
230          * If used, fixed file set. Writers must ensure that ->refs is dead,
231          * readers must ensure that ->refs is alive as long as the file* is
232          * used. Only updated through io_uring_register(2).
233          */
234         struct file             **user_files;
235         unsigned                nr_user_files;
236
237         /* if used, fixed mapped user buffers */
238         unsigned                nr_user_bufs;
239         struct io_mapped_ubuf   *user_bufs;
240
241         struct user_struct      *user;
242
243         const struct cred       *creds;
244
245         struct completion       ctx_done;
246
247         struct {
248                 struct mutex            uring_lock;
249                 wait_queue_head_t       wait;
250         } ____cacheline_aligned_in_smp;
251
252         struct {
253                 spinlock_t              completion_lock;
254                 bool                    poll_multi_file;
255                 /*
256                  * ->poll_list is protected by the ctx->uring_lock for
257                  * io_uring instances that don't use IORING_SETUP_SQPOLL.
258                  * For SQPOLL, only the single threaded io_sq_thread() will
259                  * manipulate the list, hence no extra locking is needed there.
260                  */
261                 struct list_head        poll_list;
262                 struct list_head        cancel_list;
263         } ____cacheline_aligned_in_smp;
264
265         struct async_list       pending_async[2];
266
267 #if defined(CONFIG_UNIX)
268         struct socket           *ring_sock;
269 #endif
270
271         struct list_head        task_list;
272         spinlock_t              task_lock;
273 };
274
275 struct sqe_submit {
276         const struct io_uring_sqe       *sqe;
277         unsigned short                  index;
278         u32                             sequence;
279         bool                            has_user;
280         bool                            needs_lock;
281         bool                            needs_fixed_file;
282         u8                              opcode;
283 };
284
285 /*
286  * First field must be the file pointer in all the
287  * iocb unions! See also 'struct kiocb' in <linux/fs.h>
288  */
289 struct io_poll_iocb {
290         struct file                     *file;
291         struct wait_queue_head          *head;
292         __poll_t                        events;
293         bool                            done;
294         bool                            canceled;
295         struct wait_queue_entry         wait;
296 };
297
298 struct io_timeout {
299         struct file                     *file;
300         struct hrtimer                  timer;
301 };
302
303 /*
304  * NOTE! Each of the iocb union members has the file pointer
305  * as the first entry in their struct definition. So you can
306  * access the file pointer through any of the sub-structs,
307  * or directly as just 'ki_filp' in this struct.
308  */
309 struct io_kiocb {
310         union {
311                 struct file             *file;
312                 struct kiocb            rw;
313                 struct io_poll_iocb     poll;
314                 struct io_timeout       timeout;
315         };
316
317         struct sqe_submit       submit;
318
319         struct io_ring_ctx      *ctx;
320         struct list_head        list;
321         struct list_head        link_list;
322         unsigned int            flags;
323         refcount_t              refs;
324 #define REQ_F_NOWAIT            1       /* must not punt to workers */
325 #define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
326 #define REQ_F_FIXED_FILE        4       /* ctx owns file */
327 #define REQ_F_SEQ_PREV          8       /* sequential with previous */
328 #define REQ_F_IO_DRAIN          16      /* drain existing IO first */
329 #define REQ_F_IO_DRAINED        32      /* drain done */
330 #define REQ_F_LINK              64      /* linked sqes */
331 #define REQ_F_LINK_DONE         128     /* linked sqes done */
332 #define REQ_F_FAIL_LINK         256     /* fail rest of links */
333 #define REQ_F_SHADOW_DRAIN      512     /* link-drain shadow req */
334 #define REQ_F_TIMEOUT           1024    /* timeout request */
335 #define REQ_F_ISREG             2048    /* regular file */
336 #define REQ_F_MUST_PUNT         4096    /* must be punted even for NONBLOCK */
337 #define REQ_F_TIMEOUT_NOSEQ     8192    /* no timeout sequence */
338 #define REQ_F_CANCEL            16384   /* cancel request */
339         unsigned long           fsize;
340         u64                     user_data;
341         u32                     result;
342         u32                     sequence;
343         struct files_struct     *files;
344
345         struct fs_struct        *fs;
346
347         struct work_struct      work;
348         struct task_struct      *work_task;
349         struct list_head        task_list;
350 };
351
352 #define IO_PLUG_THRESHOLD               2
353 #define IO_IOPOLL_BATCH                 8
354
355 struct io_submit_state {
356         struct blk_plug         plug;
357
358         /*
359          * io_kiocb alloc cache
360          */
361         void                    *reqs[IO_IOPOLL_BATCH];
362         unsigned                int free_reqs;
363         unsigned                int cur_req;
364
365         /*
366          * File reference cache
367          */
368         struct file             *file;
369         unsigned int            fd;
370         unsigned int            has_refs;
371         unsigned int            used_refs;
372         unsigned int            ios_left;
373 };
374
375 static void io_sq_wq_submit_work(struct work_struct *work);
376 static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
377                                  long res);
378 static void __io_free_req(struct io_kiocb *req);
379
380 static struct kmem_cache *req_cachep;
381
382 static const struct file_operations io_uring_fops;
383
384 struct sock *io_uring_get_socket(struct file *file)
385 {
386 #if defined(CONFIG_UNIX)
387         if (file->f_op == &io_uring_fops) {
388                 struct io_ring_ctx *ctx = file->private_data;
389
390                 return ctx->ring_sock->sk;
391         }
392 #endif
393         return NULL;
394 }
395 EXPORT_SYMBOL(io_uring_get_socket);
396
397 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
398 {
399         struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
400
401         complete(&ctx->ctx_done);
402 }
403
404 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
405 {
406         struct io_ring_ctx *ctx;
407         int i;
408
409         ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
410         if (!ctx)
411                 return NULL;
412
413         if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
414                             PERCPU_REF_ALLOW_REINIT, GFP_KERNEL)) {
415                 kfree(ctx);
416                 return NULL;
417         }
418
419         ctx->flags = p->flags;
420         init_waitqueue_head(&ctx->sqo_wait);
421         init_waitqueue_head(&ctx->cq_wait);
422         init_completion(&ctx->ctx_done);
423         init_completion(&ctx->sqo_thread_started);
424         mutex_init(&ctx->uring_lock);
425         init_waitqueue_head(&ctx->wait);
426         for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) {
427                 spin_lock_init(&ctx->pending_async[i].lock);
428                 INIT_LIST_HEAD(&ctx->pending_async[i].list);
429                 atomic_set(&ctx->pending_async[i].cnt, 0);
430         }
431         spin_lock_init(&ctx->completion_lock);
432         INIT_LIST_HEAD(&ctx->poll_list);
433         INIT_LIST_HEAD(&ctx->cancel_list);
434         INIT_LIST_HEAD(&ctx->defer_list);
435         INIT_LIST_HEAD(&ctx->timeout_list);
436         INIT_LIST_HEAD(&ctx->task_list);
437         spin_lock_init(&ctx->task_lock);
438         return ctx;
439 }
440
441 static void io_req_put_fs(struct io_kiocb *req)
442 {
443         struct fs_struct *fs = req->fs;
444
445         if (!fs)
446                 return;
447
448         spin_lock(&req->fs->lock);
449         if (--fs->users)
450                 fs = NULL;
451         spin_unlock(&req->fs->lock);
452         if (fs)
453                 free_fs_struct(fs);
454         req->fs = NULL;
455 }
456
457 static inline bool __io_sequence_defer(struct io_ring_ctx *ctx,
458                                        struct io_kiocb *req)
459 {
460         return req->sequence != ctx->cached_cq_tail + ctx->cached_sq_dropped
461                                         + atomic_read(&ctx->cached_cq_overflow);
462 }
463
464 static inline bool io_sequence_defer(struct io_ring_ctx *ctx,
465                                      struct io_kiocb *req)
466 {
467         if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN)
468                 return false;
469
470         return __io_sequence_defer(ctx, req);
471 }
472
473 static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
474 {
475         struct io_kiocb *req;
476
477         req = list_first_entry_or_null(&ctx->defer_list, struct io_kiocb, list);
478         if (req && !io_sequence_defer(ctx, req)) {
479                 list_del_init(&req->list);
480                 return req;
481         }
482
483         return NULL;
484 }
485
486 static struct io_kiocb *io_get_timeout_req(struct io_ring_ctx *ctx)
487 {
488         struct io_kiocb *req;
489
490         req = list_first_entry_or_null(&ctx->timeout_list, struct io_kiocb, list);
491         if (req) {
492                 if (req->flags & REQ_F_TIMEOUT_NOSEQ)
493                         return NULL;
494                 if (!__io_sequence_defer(ctx, req)) {
495                         list_del_init(&req->list);
496                         return req;
497                 }
498         }
499
500         return NULL;
501 }
502
503 static void __io_commit_cqring(struct io_ring_ctx *ctx)
504 {
505         struct io_rings *rings = ctx->rings;
506
507         if (ctx->cached_cq_tail != READ_ONCE(rings->cq.tail)) {
508                 /* order cqe stores with ring update */
509                 smp_store_release(&rings->cq.tail, ctx->cached_cq_tail);
510
511                 if (wq_has_sleeper(&ctx->cq_wait)) {
512                         wake_up_interruptible(&ctx->cq_wait);
513                         kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
514                 }
515         }
516 }
517
518 static inline void io_queue_async_work(struct io_ring_ctx *ctx,
519                                        struct io_kiocb *req)
520 {
521         unsigned long flags;
522         int rw = 0;
523
524         if (req->submit.sqe) {
525                 switch (req->submit.opcode) {
526                 case IORING_OP_WRITEV:
527                 case IORING_OP_WRITE_FIXED:
528                         rw = !(req->rw.ki_flags & IOCB_DIRECT);
529                         break;
530                 }
531         }
532
533         if (req->work.func == io_sq_wq_submit_work) {
534                 req->files = current->files;
535
536                 spin_lock_irqsave(&ctx->task_lock, flags);
537                 list_add(&req->task_list, &ctx->task_list);
538                 req->work_task = NULL;
539                 spin_unlock_irqrestore(&ctx->task_lock, flags);
540         }
541
542         queue_work(ctx->sqo_wq[rw], &req->work);
543 }
544
545 static void io_kill_timeout(struct io_kiocb *req)
546 {
547         int ret;
548
549         ret = hrtimer_try_to_cancel(&req->timeout.timer);
550         if (ret != -1) {
551                 atomic_inc(&req->ctx->cq_timeouts);
552                 list_del(&req->list);
553                 io_cqring_fill_event(req->ctx, req->user_data, 0);
554                 if (refcount_dec_and_test(&req->refs))
555                         __io_free_req(req);
556         }
557 }
558
559 static void io_kill_timeouts(struct io_ring_ctx *ctx)
560 {
561         struct io_kiocb *req, *tmp;
562
563         spin_lock_irq(&ctx->completion_lock);
564         list_for_each_entry_safe(req, tmp, &ctx->timeout_list, list)
565                 io_kill_timeout(req);
566         spin_unlock_irq(&ctx->completion_lock);
567 }
568
569 static void io_commit_cqring(struct io_ring_ctx *ctx)
570 {
571         struct io_kiocb *req;
572
573         while ((req = io_get_timeout_req(ctx)) != NULL)
574                 io_kill_timeout(req);
575
576         __io_commit_cqring(ctx);
577
578         while ((req = io_get_deferred_req(ctx)) != NULL) {
579                 if (req->flags & REQ_F_SHADOW_DRAIN) {
580                         /* Just for drain, free it. */
581                         __io_free_req(req);
582                         continue;
583                 }
584                 req->flags |= REQ_F_IO_DRAINED;
585                 io_queue_async_work(ctx, req);
586         }
587 }
588
589 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
590 {
591         struct io_rings *rings = ctx->rings;
592         unsigned tail;
593
594         tail = ctx->cached_cq_tail;
595         /*
596          * writes to the cq entry need to come after reading head; the
597          * control dependency is enough as we're using WRITE_ONCE to
598          * fill the cq entry
599          */
600         if (tail - READ_ONCE(rings->cq.head) == rings->cq_ring_entries)
601                 return NULL;
602
603         ctx->cached_cq_tail++;
604         return &rings->cqes[tail & ctx->cq_mask];
605 }
606
607 static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
608                                  long res)
609 {
610         struct io_uring_cqe *cqe;
611
612         /*
613          * If we can't get a cq entry, userspace overflowed the
614          * submission (by quite a lot). Increment the overflow count in
615          * the ring.
616          */
617         cqe = io_get_cqring(ctx);
618         if (cqe) {
619                 WRITE_ONCE(cqe->user_data, ki_user_data);
620                 WRITE_ONCE(cqe->res, res);
621                 WRITE_ONCE(cqe->flags, 0);
622         } else {
623                 WRITE_ONCE(ctx->rings->cq_overflow,
624                                 atomic_inc_return(&ctx->cached_cq_overflow));
625         }
626 }
627
628 static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
629 {
630         if (waitqueue_active(&ctx->wait))
631                 wake_up(&ctx->wait);
632         if (waitqueue_active(&ctx->sqo_wait))
633                 wake_up(&ctx->sqo_wait);
634         if (ctx->cq_ev_fd)
635                 eventfd_signal(ctx->cq_ev_fd, 1);
636 }
637
638 static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data,
639                                 long res)
640 {
641         unsigned long flags;
642
643         spin_lock_irqsave(&ctx->completion_lock, flags);
644         io_cqring_fill_event(ctx, user_data, res);
645         io_commit_cqring(ctx);
646         spin_unlock_irqrestore(&ctx->completion_lock, flags);
647
648         io_cqring_ev_posted(ctx);
649 }
650
651 static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
652                                    struct io_submit_state *state)
653 {
654         gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
655         struct io_kiocb *req;
656
657         if (!percpu_ref_tryget(&ctx->refs))
658                 return NULL;
659
660         if (!state) {
661                 req = kmem_cache_alloc(req_cachep, gfp);
662                 if (unlikely(!req))
663                         goto out;
664         } else if (!state->free_reqs) {
665                 size_t sz;
666                 int ret;
667
668                 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
669                 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs);
670
671                 /*
672                  * Bulk alloc is all-or-nothing. If we fail to get a batch,
673                  * retry single alloc to be on the safe side.
674                  */
675                 if (unlikely(ret <= 0)) {
676                         state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
677                         if (!state->reqs[0])
678                                 goto out;
679                         ret = 1;
680                 }
681                 state->free_reqs = ret - 1;
682                 state->cur_req = 1;
683                 req = state->reqs[0];
684         } else {
685                 req = state->reqs[state->cur_req];
686                 state->free_reqs--;
687                 state->cur_req++;
688         }
689
690         INIT_LIST_HEAD(&req->task_list);
691         req->file = NULL;
692         req->ctx = ctx;
693         req->flags = 0;
694         /* one is dropped after submission, the other at completion */
695         refcount_set(&req->refs, 2);
696         req->result = 0;
697         req->fs = NULL;
698         return req;
699 out:
700         percpu_ref_put(&ctx->refs);
701         return NULL;
702 }
703
704 static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
705 {
706         if (*nr) {
707                 kmem_cache_free_bulk(req_cachep, *nr, reqs);
708                 percpu_ref_put_many(&ctx->refs, *nr);
709                 *nr = 0;
710         }
711 }
712
713 static void __io_free_req(struct io_kiocb *req)
714 {
715         io_req_put_fs(req);
716         if (req->file && !(req->flags & REQ_F_FIXED_FILE))
717                 fput(req->file);
718         percpu_ref_put(&req->ctx->refs);
719         kmem_cache_free(req_cachep, req);
720 }
721
722 static void io_req_link_next(struct io_kiocb *req)
723 {
724         struct io_kiocb *nxt;
725
726         /*
727          * The list should never be empty when we are called here. But could
728          * potentially happen if the chain is messed up, check to be on the
729          * safe side.
730          */
731         nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb, list);
732         if (nxt) {
733                 list_del(&nxt->list);
734                 if (!list_empty(&req->link_list)) {
735                         INIT_LIST_HEAD(&nxt->link_list);
736                         list_splice(&req->link_list, &nxt->link_list);
737                         nxt->flags |= REQ_F_LINK;
738                 }
739
740                 nxt->flags |= REQ_F_LINK_DONE;
741                 INIT_WORK(&nxt->work, io_sq_wq_submit_work);
742                 io_queue_async_work(req->ctx, nxt);
743         }
744 }
745
746 /*
747  * Called if REQ_F_LINK is set, and we fail the head request
748  */
749 static void io_fail_links(struct io_kiocb *req)
750 {
751         struct io_kiocb *link;
752
753         while (!list_empty(&req->link_list)) {
754                 link = list_first_entry(&req->link_list, struct io_kiocb, list);
755                 list_del(&link->list);
756
757                 io_cqring_add_event(req->ctx, link->user_data, -ECANCELED);
758                 __io_free_req(link);
759         }
760 }
761
762 static void io_free_req(struct io_kiocb *req)
763 {
764         /*
765          * If LINK is set, we have dependent requests in this chain. If we
766          * didn't fail this request, queue the first one up, moving any other
767          * dependencies to the next request. In case of failure, fail the rest
768          * of the chain.
769          */
770         if (req->flags & REQ_F_LINK) {
771                 if (req->flags & REQ_F_FAIL_LINK)
772                         io_fail_links(req);
773                 else
774                         io_req_link_next(req);
775         }
776
777         __io_free_req(req);
778 }
779
780 static void io_put_req(struct io_kiocb *req)
781 {
782         if (refcount_dec_and_test(&req->refs))
783                 io_free_req(req);
784 }
785
786 static unsigned io_cqring_events(struct io_rings *rings)
787 {
788         /* See comment at the top of this file */
789         smp_rmb();
790         return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head);
791 }
792
793 static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
794 {
795         struct io_rings *rings = ctx->rings;
796
797         /* make sure SQ entry isn't read before tail */
798         return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
799 }
800
801 /*
802  * Find and free completed poll iocbs
803  */
804 static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
805                                struct list_head *done)
806 {
807         void *reqs[IO_IOPOLL_BATCH];
808         struct io_kiocb *req;
809         int to_free;
810
811         to_free = 0;
812         while (!list_empty(done)) {
813                 req = list_first_entry(done, struct io_kiocb, list);
814                 list_del(&req->list);
815
816                 io_cqring_fill_event(ctx, req->user_data, req->result);
817                 (*nr_events)++;
818
819                 if (refcount_dec_and_test(&req->refs)) {
820                         /* If we're not using fixed files, we have to pair the
821                          * completion part with the file put. Use regular
822                          * completions for those, only batch free for fixed
823                          * file and non-linked commands.
824                          */
825                         if ((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) ==
826                             REQ_F_FIXED_FILE) {
827                                 reqs[to_free++] = req;
828                                 if (to_free == ARRAY_SIZE(reqs))
829                                         io_free_req_many(ctx, reqs, &to_free);
830                         } else {
831                                 io_free_req(req);
832                         }
833                 }
834         }
835
836         io_commit_cqring(ctx);
837         io_free_req_many(ctx, reqs, &to_free);
838 }
839
840 static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
841                         long min)
842 {
843         struct io_kiocb *req, *tmp;
844         LIST_HEAD(done);
845         bool spin;
846         int ret;
847
848         /*
849          * Only spin for completions if we don't have multiple devices hanging
850          * off our complete list, and we're under the requested amount.
851          */
852         spin = !ctx->poll_multi_file && *nr_events < min;
853
854         ret = 0;
855         list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
856                 struct kiocb *kiocb = &req->rw;
857
858                 /*
859                  * Move completed entries to our local list. If we find a
860                  * request that requires polling, break out and complete
861                  * the done list first, if we have entries there.
862                  */
863                 if (req->flags & REQ_F_IOPOLL_COMPLETED) {
864                         list_move_tail(&req->list, &done);
865                         continue;
866                 }
867                 if (!list_empty(&done))
868                         break;
869
870                 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
871                 if (ret < 0)
872                         break;
873
874                 if (ret && spin)
875                         spin = false;
876                 ret = 0;
877         }
878
879         if (!list_empty(&done))
880                 io_iopoll_complete(ctx, nr_events, &done);
881
882         return ret;
883 }
884
885 /*
886  * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a
887  * non-spinning poll check - we'll still enter the driver poll loop, but only
888  * as a non-spinning completion check.
889  */
890 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
891                                 long min)
892 {
893         while (!list_empty(&ctx->poll_list) && !need_resched()) {
894                 int ret;
895
896                 ret = io_do_iopoll(ctx, nr_events, min);
897                 if (ret < 0)
898                         return ret;
899                 if (!min || *nr_events >= min)
900                         return 0;
901         }
902
903         return 1;
904 }
905
906 /*
907  * We can't just wait for polled events to come to us, we have to actively
908  * find and complete them.
909  */
910 static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
911 {
912         if (!(ctx->flags & IORING_SETUP_IOPOLL))
913                 return;
914
915         mutex_lock(&ctx->uring_lock);
916         while (!list_empty(&ctx->poll_list)) {
917                 unsigned int nr_events = 0;
918
919                 io_iopoll_getevents(ctx, &nr_events, 1);
920
921                 /*
922                  * Ensure we allow local-to-the-cpu processing to take place,
923                  * in this case we need to ensure that we reap all events.
924                  */
925                 cond_resched();
926         }
927         mutex_unlock(&ctx->uring_lock);
928 }
929
930 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
931                            long min)
932 {
933         int iters = 0, ret = 0;
934
935         /*
936          * We disallow the app entering submit/complete with polling, but we
937          * still need to lock the ring to prevent racing with polled issue
938          * that got punted to a workqueue.
939          */
940         mutex_lock(&ctx->uring_lock);
941         do {
942                 int tmin = 0;
943
944                 /*
945                  * Don't enter poll loop if we already have events pending.
946                  * If we do, we can potentially be spinning for commands that
947                  * already triggered a CQE (eg in error).
948                  */
949                 if (io_cqring_events(ctx->rings))
950                         break;
951
952                 /*
953                  * If a submit got punted to a workqueue, we can have the
954                  * application entering polling for a command before it gets
955                  * issued. That app will hold the uring_lock for the duration
956                  * of the poll right here, so we need to take a breather every
957                  * now and then to ensure that the issue has a chance to add
958                  * the poll to the issued list. Otherwise we can spin here
959                  * forever, while the workqueue is stuck trying to acquire the
960                  * very same mutex.
961                  */
962                 if (!(++iters & 7)) {
963                         mutex_unlock(&ctx->uring_lock);
964                         mutex_lock(&ctx->uring_lock);
965                 }
966
967                 if (*nr_events < min)
968                         tmin = min - *nr_events;
969
970                 ret = io_iopoll_getevents(ctx, nr_events, tmin);
971                 if (ret <= 0)
972                         break;
973                 ret = 0;
974         } while (min && !*nr_events && !need_resched());
975
976         mutex_unlock(&ctx->uring_lock);
977         return ret;
978 }
979
980 static void kiocb_end_write(struct io_kiocb *req)
981 {
982         /*
983          * Tell lockdep we inherited freeze protection from submission
984          * thread.
985          */
986         if (req->flags & REQ_F_ISREG) {
987                 struct inode *inode = file_inode(req->file);
988
989                 __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
990         }
991         file_end_write(req->file);
992 }
993
994 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
995 {
996         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
997
998         if (kiocb->ki_flags & IOCB_WRITE)
999                 kiocb_end_write(req);
1000
1001         if ((req->flags & REQ_F_LINK) && res != req->result)
1002                 req->flags |= REQ_F_FAIL_LINK;
1003         io_cqring_add_event(req->ctx, req->user_data, res);
1004         io_put_req(req);
1005 }
1006
1007 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
1008 {
1009         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
1010
1011         if (kiocb->ki_flags & IOCB_WRITE)
1012                 kiocb_end_write(req);
1013
1014         if ((req->flags & REQ_F_LINK) && res != req->result)
1015                 req->flags |= REQ_F_FAIL_LINK;
1016         req->result = res;
1017         if (res != -EAGAIN)
1018                 req->flags |= REQ_F_IOPOLL_COMPLETED;
1019 }
1020
1021 /*
1022  * After the iocb has been issued, it's safe to be found on the poll list.
1023  * Adding the kiocb to the list AFTER submission ensures that we don't
1024  * find it from a io_iopoll_getevents() thread before the issuer is done
1025  * accessing the kiocb cookie.
1026  */
1027 static void io_iopoll_req_issued(struct io_kiocb *req)
1028 {
1029         struct io_ring_ctx *ctx = req->ctx;
1030
1031         /*
1032          * Track whether we have multiple files in our lists. This will impact
1033          * how we do polling eventually, not spinning if we're on potentially
1034          * different devices.
1035          */
1036         if (list_empty(&ctx->poll_list)) {
1037                 ctx->poll_multi_file = false;
1038         } else if (!ctx->poll_multi_file) {
1039                 struct io_kiocb *list_req;
1040
1041                 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
1042                                                 list);
1043                 if (list_req->rw.ki_filp != req->rw.ki_filp)
1044                         ctx->poll_multi_file = true;
1045         }
1046
1047         /*
1048          * For fast devices, IO may have already completed. If it has, add
1049          * it to the front so we find it first.
1050          */
1051         if (req->flags & REQ_F_IOPOLL_COMPLETED)
1052                 list_add(&req->list, &ctx->poll_list);
1053         else
1054                 list_add_tail(&req->list, &ctx->poll_list);
1055 }
1056
1057 static void io_file_put(struct io_submit_state *state)
1058 {
1059         if (state->file) {
1060                 int diff = state->has_refs - state->used_refs;
1061
1062                 if (diff)
1063                         fput_many(state->file, diff);
1064                 state->file = NULL;
1065         }
1066 }
1067
1068 /*
1069  * Get as many references to a file as we have IOs left in this submission,
1070  * assuming most submissions are for one file, or at least that each file
1071  * has more than one submission.
1072  */
1073 static struct file *io_file_get(struct io_submit_state *state, int fd)
1074 {
1075         if (!state)
1076                 return fget(fd);
1077
1078         if (state->file) {
1079                 if (state->fd == fd) {
1080                         state->used_refs++;
1081                         state->ios_left--;
1082                         return state->file;
1083                 }
1084                 io_file_put(state);
1085         }
1086         state->file = fget_many(fd, state->ios_left);
1087         if (!state->file)
1088                 return NULL;
1089
1090         state->fd = fd;
1091         state->has_refs = state->ios_left;
1092         state->used_refs = 1;
1093         state->ios_left--;
1094         return state->file;
1095 }
1096
1097 /*
1098  * If we tracked the file through the SCM inflight mechanism, we could support
1099  * any file. For now, just ensure that anything potentially problematic is done
1100  * inline.
1101  */
1102 static bool io_file_supports_async(struct file *file)
1103 {
1104         umode_t mode = file_inode(file)->i_mode;
1105
1106         if (S_ISBLK(mode) || S_ISCHR(mode))
1107                 return true;
1108         if (S_ISREG(mode) && file->f_op != &io_uring_fops)
1109                 return true;
1110
1111         return false;
1112 }
1113
1114 static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s,
1115                       bool force_nonblock)
1116 {
1117         const struct io_uring_sqe *sqe = s->sqe;
1118         struct io_ring_ctx *ctx = req->ctx;
1119         struct kiocb *kiocb = &req->rw;
1120         unsigned ioprio;
1121         int ret;
1122
1123         if (!req->file)
1124                 return -EBADF;
1125
1126         if (S_ISREG(file_inode(req->file)->i_mode))
1127                 req->flags |= REQ_F_ISREG;
1128
1129         if (force_nonblock)
1130                 req->fsize = rlimit(RLIMIT_FSIZE);
1131
1132         /*
1133          * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
1134          * we know to async punt it even if it was opened O_NONBLOCK
1135          */
1136         if (force_nonblock && !io_file_supports_async(req->file)) {
1137                 req->flags |= REQ_F_MUST_PUNT;
1138                 return -EAGAIN;
1139         }
1140
1141         kiocb->ki_pos = READ_ONCE(sqe->off);
1142         kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
1143         kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
1144
1145         ioprio = READ_ONCE(sqe->ioprio);
1146         if (ioprio) {
1147                 ret = ioprio_check_cap(ioprio);
1148                 if (ret)
1149                         return ret;
1150
1151                 kiocb->ki_ioprio = ioprio;
1152         } else
1153                 kiocb->ki_ioprio = get_current_ioprio();
1154
1155         ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
1156         if (unlikely(ret))
1157                 return ret;
1158
1159         /* don't allow async punt if RWF_NOWAIT was requested */
1160         if ((kiocb->ki_flags & IOCB_NOWAIT) ||
1161             (req->file->f_flags & O_NONBLOCK))
1162                 req->flags |= REQ_F_NOWAIT;
1163
1164         if (force_nonblock)
1165                 kiocb->ki_flags |= IOCB_NOWAIT;
1166
1167         if (ctx->flags & IORING_SETUP_IOPOLL) {
1168                 if (!(kiocb->ki_flags & IOCB_DIRECT) ||
1169                     !kiocb->ki_filp->f_op->iopoll)
1170                         return -EOPNOTSUPP;
1171
1172                 kiocb->ki_flags |= IOCB_HIPRI;
1173                 kiocb->ki_complete = io_complete_rw_iopoll;
1174                 req->result = 0;
1175         } else {
1176                 if (kiocb->ki_flags & IOCB_HIPRI)
1177                         return -EINVAL;
1178                 kiocb->ki_complete = io_complete_rw;
1179         }
1180         return 0;
1181 }
1182
1183 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
1184 {
1185         switch (ret) {
1186         case -EIOCBQUEUED:
1187                 break;
1188         case -ERESTARTSYS:
1189         case -ERESTARTNOINTR:
1190         case -ERESTARTNOHAND:
1191         case -ERESTART_RESTARTBLOCK:
1192                 /*
1193                  * We can't just restart the syscall, since previously
1194                  * submitted sqes may already be in progress. Just fail this
1195                  * IO with EINTR.
1196                  */
1197                 ret = -EINTR;
1198                 /* fall through */
1199         default:
1200                 kiocb->ki_complete(kiocb, ret, 0);
1201         }
1202 }
1203
1204 static int io_import_fixed(struct io_ring_ctx *ctx, int rw,
1205                            const struct io_uring_sqe *sqe,
1206                            struct iov_iter *iter)
1207 {
1208         size_t len = READ_ONCE(sqe->len);
1209         struct io_mapped_ubuf *imu;
1210         unsigned index, buf_index;
1211         size_t offset;
1212         u64 buf_addr;
1213
1214         /* attempt to use fixed buffers without having provided iovecs */
1215         if (unlikely(!ctx->user_bufs))
1216                 return -EFAULT;
1217
1218         buf_index = READ_ONCE(sqe->buf_index);
1219         if (unlikely(buf_index >= ctx->nr_user_bufs))
1220                 return -EFAULT;
1221
1222         index = array_index_nospec(buf_index, ctx->nr_user_bufs);
1223         imu = &ctx->user_bufs[index];
1224         buf_addr = READ_ONCE(sqe->addr);
1225
1226         /* overflow */
1227         if (buf_addr + len < buf_addr)
1228                 return -EFAULT;
1229         /* not inside the mapped region */
1230         if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
1231                 return -EFAULT;
1232
1233         /*
1234          * May not be a start of buffer, set size appropriately
1235          * and advance us to the beginning.
1236          */
1237         offset = buf_addr - imu->ubuf;
1238         iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
1239
1240         if (offset) {
1241                 /*
1242                  * Don't use iov_iter_advance() here, as it's really slow for
1243                  * using the latter parts of a big fixed buffer - it iterates
1244                  * over each segment manually. We can cheat a bit here, because
1245                  * we know that:
1246                  *
1247                  * 1) it's a BVEC iter, we set it up
1248                  * 2) all bvecs are PAGE_SIZE in size, except potentially the
1249                  *    first and last bvec
1250                  *
1251                  * So just find our index, and adjust the iterator afterwards.
1252                  * If the offset is within the first bvec (or the whole first
1253                  * bvec, just use iov_iter_advance(). This makes it easier
1254                  * since we can just skip the first segment, which may not
1255                  * be PAGE_SIZE aligned.
1256                  */
1257                 const struct bio_vec *bvec = imu->bvec;
1258
1259                 if (offset <= bvec->bv_len) {
1260                         iov_iter_advance(iter, offset);
1261                 } else {
1262                         unsigned long seg_skip;
1263
1264                         /* skip first vec */
1265                         offset -= bvec->bv_len;
1266                         seg_skip = 1 + (offset >> PAGE_SHIFT);
1267
1268                         iter->bvec = bvec + seg_skip;
1269                         iter->nr_segs -= seg_skip;
1270                         iter->count -= bvec->bv_len + offset;
1271                         iter->iov_offset = offset & ~PAGE_MASK;
1272                 }
1273         }
1274
1275         return len;
1276 }
1277
1278 static ssize_t io_import_iovec(struct io_ring_ctx *ctx, int rw,
1279                                struct io_kiocb *req, struct iovec **iovec,
1280                                struct iov_iter *iter)
1281 {
1282         const struct io_uring_sqe *sqe = req->submit.sqe;
1283         void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr));
1284         size_t sqe_len = READ_ONCE(sqe->len);
1285         u8 opcode;
1286
1287         opcode = req->submit.opcode;
1288         if (opcode == IORING_OP_READ_FIXED ||
1289             opcode == IORING_OP_WRITE_FIXED) {
1290                 ssize_t ret = io_import_fixed(ctx, rw, sqe, iter);
1291                 *iovec = NULL;
1292                 return ret;
1293         }
1294
1295         if (!req->submit.has_user)
1296                 return -EFAULT;
1297
1298 #ifdef CONFIG_COMPAT
1299         if (ctx->compat)
1300                 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
1301                                                 iovec, iter);
1302 #endif
1303
1304         return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
1305 }
1306
1307 static inline bool io_should_merge(struct async_list *al, struct kiocb *kiocb)
1308 {
1309         if (al->file == kiocb->ki_filp) {
1310                 off_t start, end;
1311
1312                 /*
1313                  * Allow merging if we're anywhere in the range of the same
1314                  * page. Generally this happens for sub-page reads or writes,
1315                  * and it's beneficial to allow the first worker to bring the
1316                  * page in and the piggy backed work can then work on the
1317                  * cached page.
1318                  */
1319                 start = al->io_start & PAGE_MASK;
1320                 end = (al->io_start + al->io_len + PAGE_SIZE - 1) & PAGE_MASK;
1321                 if (kiocb->ki_pos >= start && kiocb->ki_pos <= end)
1322                         return true;
1323         }
1324
1325         al->file = NULL;
1326         return false;
1327 }
1328
1329 /*
1330  * Make a note of the last file/offset/direction we punted to async
1331  * context. We'll use this information to see if we can piggy back a
1332  * sequential request onto the previous one, if it's still hasn't been
1333  * completed by the async worker.
1334  */
1335 static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
1336 {
1337         struct async_list *async_list = &req->ctx->pending_async[rw];
1338         struct kiocb *kiocb = &req->rw;
1339         struct file *filp = kiocb->ki_filp;
1340
1341         if (io_should_merge(async_list, kiocb)) {
1342                 unsigned long max_bytes;
1343
1344                 /* Use 8x RA size as a decent limiter for both reads/writes */
1345                 max_bytes = filp->f_ra.ra_pages << (PAGE_SHIFT + 3);
1346                 if (!max_bytes)
1347                         max_bytes = VM_READAHEAD_PAGES << (PAGE_SHIFT + 3);
1348
1349                 /* If max len are exceeded, reset the state */
1350                 if (async_list->io_len + len <= max_bytes) {
1351                         req->flags |= REQ_F_SEQ_PREV;
1352                         async_list->io_len += len;
1353                 } else {
1354                         async_list->file = NULL;
1355                 }
1356         }
1357
1358         /* New file? Reset state. */
1359         if (async_list->file != filp) {
1360                 async_list->io_start = kiocb->ki_pos;
1361                 async_list->io_len = len;
1362                 async_list->file = filp;
1363         }
1364 }
1365
1366 /*
1367  * For files that don't have ->read_iter() and ->write_iter(), handle them
1368  * by looping over ->read() or ->write() manually.
1369  */
1370 static ssize_t loop_rw_iter(int rw, struct file *file, struct kiocb *kiocb,
1371                            struct iov_iter *iter)
1372 {
1373         ssize_t ret = 0;
1374
1375         /*
1376          * Don't support polled IO through this interface, and we can't
1377          * support non-blocking either. For the latter, this just causes
1378          * the kiocb to be handled from an async context.
1379          */
1380         if (kiocb->ki_flags & IOCB_HIPRI)
1381                 return -EOPNOTSUPP;
1382         if (kiocb->ki_flags & IOCB_NOWAIT)
1383                 return -EAGAIN;
1384
1385         while (iov_iter_count(iter)) {
1386                 struct iovec iovec;
1387                 ssize_t nr;
1388
1389                 if (!iov_iter_is_bvec(iter)) {
1390                         iovec = iov_iter_iovec(iter);
1391                 } else {
1392                         /* fixed buffers import bvec */
1393                         iovec.iov_base = kmap(iter->bvec->bv_page)
1394                                                 + iter->iov_offset;
1395                         iovec.iov_len = min(iter->count,
1396                                         iter->bvec->bv_len - iter->iov_offset);
1397                 }
1398
1399                 if (rw == READ) {
1400                         nr = file->f_op->read(file, iovec.iov_base,
1401                                               iovec.iov_len, &kiocb->ki_pos);
1402                 } else {
1403                         nr = file->f_op->write(file, iovec.iov_base,
1404                                                iovec.iov_len, &kiocb->ki_pos);
1405                 }
1406
1407                 if (iov_iter_is_bvec(iter))
1408                         kunmap(iter->bvec->bv_page);
1409
1410                 if (nr < 0) {
1411                         if (!ret)
1412                                 ret = nr;
1413                         break;
1414                 }
1415                 ret += nr;
1416                 if (nr != iovec.iov_len)
1417                         break;
1418                 iov_iter_advance(iter, nr);
1419         }
1420
1421         return ret;
1422 }
1423
1424 static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
1425                    bool force_nonblock)
1426 {
1427         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1428         struct kiocb *kiocb = &req->rw;
1429         struct iov_iter iter;
1430         struct file *file;
1431         size_t iov_count;
1432         ssize_t read_size, ret;
1433
1434         ret = io_prep_rw(req, s, force_nonblock);
1435         if (ret)
1436                 return ret;
1437         file = kiocb->ki_filp;
1438
1439         if (unlikely(!(file->f_mode & FMODE_READ)))
1440                 return -EBADF;
1441
1442         ret = io_import_iovec(req->ctx, READ, req, &iovec, &iter);
1443         if (ret < 0)
1444                 return ret;
1445
1446         read_size = ret;
1447         if (req->flags & REQ_F_LINK)
1448                 req->result = read_size;
1449
1450         iov_count = iov_iter_count(&iter);
1451         ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count);
1452         if (!ret) {
1453                 ssize_t ret2;
1454
1455                 if (file->f_op->read_iter)
1456                         ret2 = call_read_iter(file, kiocb, &iter);
1457                 else if (req->file->f_op->read)
1458                         ret2 = loop_rw_iter(READ, file, kiocb, &iter);
1459                 else
1460                         ret2 = -EINVAL;
1461
1462                 /*
1463                  * In case of a short read, punt to async. This can happen
1464                  * if we have data partially cached. Alternatively we can
1465                  * return the short read, in which case the application will
1466                  * need to issue another SQE and wait for it. That SQE will
1467                  * need async punt anyway, so it's more efficient to do it
1468                  * here.
1469                  */
1470                 if (force_nonblock && !(req->flags & REQ_F_NOWAIT) &&
1471                     (req->flags & REQ_F_ISREG) &&
1472                     ret2 > 0 && ret2 < read_size)
1473                         ret2 = -EAGAIN;
1474                 /* Catch -EAGAIN return for forced non-blocking submission */
1475                 if (!force_nonblock || ret2 != -EAGAIN) {
1476                         io_rw_done(kiocb, ret2);
1477                 } else {
1478                         /*
1479                          * If ->needs_lock is true, we're already in async
1480                          * context.
1481                          */
1482                         if (!s->needs_lock)
1483                                 io_async_list_note(READ, req, iov_count);
1484                         ret = -EAGAIN;
1485                 }
1486         }
1487         kfree(iovec);
1488         return ret;
1489 }
1490
1491 static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
1492                     bool force_nonblock)
1493 {
1494         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1495         struct kiocb *kiocb = &req->rw;
1496         struct iov_iter iter;
1497         struct file *file;
1498         size_t iov_count;
1499         ssize_t ret;
1500
1501         ret = io_prep_rw(req, s, force_nonblock);
1502         if (ret)
1503                 return ret;
1504
1505         file = kiocb->ki_filp;
1506         if (unlikely(!(file->f_mode & FMODE_WRITE)))
1507                 return -EBADF;
1508
1509         ret = io_import_iovec(req->ctx, WRITE, req, &iovec, &iter);
1510         if (ret < 0)
1511                 return ret;
1512
1513         if (req->flags & REQ_F_LINK)
1514                 req->result = ret;
1515
1516         iov_count = iov_iter_count(&iter);
1517
1518         ret = -EAGAIN;
1519         if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) {
1520                 /* If ->needs_lock is true, we're already in async context. */
1521                 if (!s->needs_lock)
1522                         io_async_list_note(WRITE, req, iov_count);
1523                 goto out_free;
1524         }
1525
1526         ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count);
1527         if (!ret) {
1528                 ssize_t ret2;
1529
1530                 /*
1531                  * Open-code file_start_write here to grab freeze protection,
1532                  * which will be released by another thread in
1533                  * io_complete_rw().  Fool lockdep by telling it the lock got
1534                  * released so that it doesn't complain about the held lock when
1535                  * we return to userspace.
1536                  */
1537                 if (req->flags & REQ_F_ISREG) {
1538                         __sb_start_write(file_inode(file)->i_sb,
1539                                                 SB_FREEZE_WRITE, true);
1540                         __sb_writers_release(file_inode(file)->i_sb,
1541                                                 SB_FREEZE_WRITE);
1542                 }
1543                 kiocb->ki_flags |= IOCB_WRITE;
1544
1545                 if (!force_nonblock)
1546                         current->signal->rlim[RLIMIT_FSIZE].rlim_cur = req->fsize;
1547
1548                 if (file->f_op->write_iter)
1549                         ret2 = call_write_iter(file, kiocb, &iter);
1550                 else if (req->file->f_op->write)
1551                         ret2 = loop_rw_iter(WRITE, file, kiocb, &iter);
1552                 else
1553                         ret2 = -EINVAL;
1554
1555                 if (!force_nonblock)
1556                         current->signal->rlim[RLIMIT_FSIZE].rlim_cur = RLIM_INFINITY;
1557
1558                 if (!force_nonblock || ret2 != -EAGAIN) {
1559                         io_rw_done(kiocb, ret2);
1560                 } else {
1561                         /*
1562                          * If ->needs_lock is true, we're already in async
1563                          * context.
1564                          */
1565                         if (!s->needs_lock)
1566                                 io_async_list_note(WRITE, req, iov_count);
1567                         ret = -EAGAIN;
1568                 }
1569         }
1570 out_free:
1571         kfree(iovec);
1572         return ret;
1573 }
1574
1575 /*
1576  * IORING_OP_NOP just posts a completion event, nothing else.
1577  */
1578 static int io_nop(struct io_kiocb *req, u64 user_data)
1579 {
1580         struct io_ring_ctx *ctx = req->ctx;
1581         long err = 0;
1582
1583         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1584                 return -EINVAL;
1585
1586         io_cqring_add_event(ctx, user_data, err);
1587         io_put_req(req);
1588         return 0;
1589 }
1590
1591 static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1592 {
1593         struct io_ring_ctx *ctx = req->ctx;
1594
1595         if (!req->file)
1596                 return -EBADF;
1597
1598         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1599                 return -EINVAL;
1600         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1601                 return -EINVAL;
1602
1603         return 0;
1604 }
1605
1606 static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1607                     bool force_nonblock)
1608 {
1609         loff_t sqe_off = READ_ONCE(sqe->off);
1610         loff_t sqe_len = READ_ONCE(sqe->len);
1611         loff_t end = sqe_off + sqe_len;
1612         unsigned fsync_flags;
1613         int ret;
1614
1615         fsync_flags = READ_ONCE(sqe->fsync_flags);
1616         if (unlikely(fsync_flags & ~IORING_FSYNC_DATASYNC))
1617                 return -EINVAL;
1618
1619         ret = io_prep_fsync(req, sqe);
1620         if (ret)
1621                 return ret;
1622
1623         /* fsync always requires a blocking context */
1624         if (force_nonblock)
1625                 return -EAGAIN;
1626
1627         ret = vfs_fsync_range(req->rw.ki_filp, sqe_off,
1628                                 end > 0 ? end : LLONG_MAX,
1629                                 fsync_flags & IORING_FSYNC_DATASYNC);
1630
1631         if (ret < 0 && (req->flags & REQ_F_LINK))
1632                 req->flags |= REQ_F_FAIL_LINK;
1633         io_cqring_add_event(req->ctx, sqe->user_data, ret);
1634         io_put_req(req);
1635         return 0;
1636 }
1637
1638 static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1639 {
1640         struct io_ring_ctx *ctx = req->ctx;
1641         int ret = 0;
1642
1643         if (!req->file)
1644                 return -EBADF;
1645
1646         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1647                 return -EINVAL;
1648         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1649                 return -EINVAL;
1650
1651         return ret;
1652 }
1653
1654 static int io_sync_file_range(struct io_kiocb *req,
1655                               const struct io_uring_sqe *sqe,
1656                               bool force_nonblock)
1657 {
1658         loff_t sqe_off;
1659         loff_t sqe_len;
1660         unsigned flags;
1661         int ret;
1662
1663         ret = io_prep_sfr(req, sqe);
1664         if (ret)
1665                 return ret;
1666
1667         /* sync_file_range always requires a blocking context */
1668         if (force_nonblock)
1669                 return -EAGAIN;
1670
1671         sqe_off = READ_ONCE(sqe->off);
1672         sqe_len = READ_ONCE(sqe->len);
1673         flags = READ_ONCE(sqe->sync_range_flags);
1674
1675         ret = sync_file_range(req->rw.ki_filp, sqe_off, sqe_len, flags);
1676
1677         if (ret < 0 && (req->flags & REQ_F_LINK))
1678                 req->flags |= REQ_F_FAIL_LINK;
1679         io_cqring_add_event(req->ctx, sqe->user_data, ret);
1680         io_put_req(req);
1681         return 0;
1682 }
1683
1684 #if defined(CONFIG_NET)
1685 static int io_send_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1686                            bool force_nonblock,
1687                    long (*fn)(struct socket *, struct user_msghdr __user *,
1688                                 unsigned int))
1689 {
1690         struct socket *sock;
1691         int ret;
1692
1693         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1694                 return -EINVAL;
1695
1696         sock = sock_from_file(req->file, &ret);
1697         if (sock) {
1698                 struct user_msghdr __user *msg;
1699                 unsigned flags;
1700
1701                 flags = READ_ONCE(sqe->msg_flags);
1702                 if (flags & MSG_DONTWAIT)
1703                         req->flags |= REQ_F_NOWAIT;
1704                 else if (force_nonblock)
1705                         flags |= MSG_DONTWAIT;
1706
1707 #ifdef CONFIG_COMPAT
1708                 if (req->ctx->compat)
1709                         flags |= MSG_CMSG_COMPAT;
1710 #endif
1711
1712                 msg = (struct user_msghdr __user *) (unsigned long)
1713                         READ_ONCE(sqe->addr);
1714
1715                 ret = fn(sock, msg, flags);
1716                 if (force_nonblock && ret == -EAGAIN)
1717                         return ret;
1718                 if (ret == -ERESTARTSYS)
1719                         ret = -EINTR;
1720         }
1721
1722         io_req_put_fs(req);
1723         io_cqring_add_event(req->ctx, sqe->user_data, ret);
1724         io_put_req(req);
1725         return 0;
1726 }
1727 #endif
1728
1729 static int io_sendmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1730                       bool force_nonblock)
1731 {
1732 #if defined(CONFIG_NET)
1733         return io_send_recvmsg(req, sqe, force_nonblock, __sys_sendmsg_sock);
1734 #else
1735         return -EOPNOTSUPP;
1736 #endif
1737 }
1738
1739 static int io_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1740                       bool force_nonblock)
1741 {
1742 #if defined(CONFIG_NET)
1743         return io_send_recvmsg(req, sqe, force_nonblock, __sys_recvmsg_sock);
1744 #else
1745         return -EOPNOTSUPP;
1746 #endif
1747 }
1748
1749 static void io_poll_remove_one(struct io_kiocb *req)
1750 {
1751         struct io_poll_iocb *poll = &req->poll;
1752
1753         spin_lock(&poll->head->lock);
1754         WRITE_ONCE(poll->canceled, true);
1755         if (!list_empty(&poll->wait.entry)) {
1756                 list_del_init(&poll->wait.entry);
1757                 io_queue_async_work(req->ctx, req);
1758         }
1759         spin_unlock(&poll->head->lock);
1760
1761         list_del_init(&req->list);
1762 }
1763
1764 static void io_poll_remove_all(struct io_ring_ctx *ctx)
1765 {
1766         struct io_kiocb *req;
1767
1768         spin_lock_irq(&ctx->completion_lock);
1769         while (!list_empty(&ctx->cancel_list)) {
1770                 req = list_first_entry(&ctx->cancel_list, struct io_kiocb,list);
1771                 io_poll_remove_one(req);
1772         }
1773         spin_unlock_irq(&ctx->completion_lock);
1774 }
1775
1776 /*
1777  * Find a running poll command that matches one specified in sqe->addr,
1778  * and remove it if found.
1779  */
1780 static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1781 {
1782         struct io_ring_ctx *ctx = req->ctx;
1783         struct io_kiocb *poll_req, *next;
1784         int ret = -ENOENT;
1785
1786         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1787                 return -EINVAL;
1788         if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
1789             sqe->poll_events)
1790                 return -EINVAL;
1791
1792         spin_lock_irq(&ctx->completion_lock);
1793         list_for_each_entry_safe(poll_req, next, &ctx->cancel_list, list) {
1794                 if (READ_ONCE(sqe->addr) == poll_req->user_data) {
1795                         io_poll_remove_one(poll_req);
1796                         ret = 0;
1797                         break;
1798                 }
1799         }
1800         spin_unlock_irq(&ctx->completion_lock);
1801
1802         io_cqring_add_event(req->ctx, sqe->user_data, ret);
1803         io_put_req(req);
1804         return 0;
1805 }
1806
1807 static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req,
1808                              __poll_t mask)
1809 {
1810         req->poll.done = true;
1811         io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask));
1812         io_commit_cqring(ctx);
1813 }
1814
1815 static void io_poll_complete_work(struct work_struct *work)
1816 {
1817         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1818         struct io_poll_iocb *poll = &req->poll;
1819         struct poll_table_struct pt = { ._key = poll->events };
1820         struct io_ring_ctx *ctx = req->ctx;
1821         const struct cred *old_cred;
1822         __poll_t mask = 0;
1823
1824         old_cred = override_creds(ctx->creds);
1825
1826         if (!READ_ONCE(poll->canceled))
1827                 mask = vfs_poll(poll->file, &pt) & poll->events;
1828
1829         /*
1830          * Note that ->ki_cancel callers also delete iocb from active_reqs after
1831          * calling ->ki_cancel.  We need the ctx_lock roundtrip here to
1832          * synchronize with them.  In the cancellation case the list_del_init
1833          * itself is not actually needed, but harmless so we keep it in to
1834          * avoid further branches in the fast path.
1835          */
1836         spin_lock_irq(&ctx->completion_lock);
1837         if (!mask && !READ_ONCE(poll->canceled)) {
1838                 add_wait_queue(poll->head, &poll->wait);
1839                 spin_unlock_irq(&ctx->completion_lock);
1840                 goto out;
1841         }
1842         list_del_init(&req->list);
1843         io_poll_complete(ctx, req, mask);
1844         spin_unlock_irq(&ctx->completion_lock);
1845
1846         io_cqring_ev_posted(ctx);
1847         io_put_req(req);
1848 out:
1849         revert_creds(old_cred);
1850 }
1851
1852 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
1853                         void *key)
1854 {
1855         struct io_poll_iocb *poll = container_of(wait, struct io_poll_iocb,
1856                                                         wait);
1857         struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
1858         struct io_ring_ctx *ctx = req->ctx;
1859         __poll_t mask = key_to_poll(key);
1860         unsigned long flags;
1861
1862         /* for instances that support it check for an event match first: */
1863         if (mask && !(mask & poll->events))
1864                 return 0;
1865
1866         list_del_init(&poll->wait.entry);
1867
1868         if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) {
1869                 list_del(&req->list);
1870                 io_poll_complete(ctx, req, mask);
1871                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
1872
1873                 io_cqring_ev_posted(ctx);
1874                 io_put_req(req);
1875         } else {
1876                 io_queue_async_work(ctx, req);
1877         }
1878
1879         return 1;
1880 }
1881
1882 struct io_poll_table {
1883         struct poll_table_struct pt;
1884         struct io_kiocb *req;
1885         int error;
1886 };
1887
1888 static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
1889                                struct poll_table_struct *p)
1890 {
1891         struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
1892
1893         if (unlikely(pt->req->poll.head)) {
1894                 pt->error = -EINVAL;
1895                 return;
1896         }
1897
1898         pt->error = 0;
1899         pt->req->poll.head = head;
1900         add_wait_queue(head, &pt->req->poll.wait);
1901 }
1902
1903 static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1904 {
1905         struct io_poll_iocb *poll = &req->poll;
1906         struct io_ring_ctx *ctx = req->ctx;
1907         struct io_poll_table ipt;
1908         bool cancel = false;
1909         __poll_t mask;
1910         u16 events;
1911
1912         if (req->file->f_op->may_pollfree)
1913                 return -EOPNOTSUPP;
1914
1915         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1916                 return -EINVAL;
1917         if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
1918                 return -EINVAL;
1919         if (!poll->file)
1920                 return -EBADF;
1921
1922         req->submit.sqe = NULL;
1923         INIT_WORK(&req->work, io_poll_complete_work);
1924         events = READ_ONCE(sqe->poll_events);
1925         poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
1926
1927         poll->head = NULL;
1928         poll->done = false;
1929         poll->canceled = false;
1930
1931         ipt.pt._qproc = io_poll_queue_proc;
1932         ipt.pt._key = poll->events;
1933         ipt.req = req;
1934         ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
1935
1936         /* initialized the list so that we can do list_empty checks */
1937         INIT_LIST_HEAD(&poll->wait.entry);
1938         init_waitqueue_func_entry(&poll->wait, io_poll_wake);
1939
1940         INIT_LIST_HEAD(&req->list);
1941
1942         mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
1943
1944         spin_lock_irq(&ctx->completion_lock);
1945         if (likely(poll->head)) {
1946                 spin_lock(&poll->head->lock);
1947                 if (unlikely(list_empty(&poll->wait.entry))) {
1948                         if (ipt.error)
1949                                 cancel = true;
1950                         ipt.error = 0;
1951                         mask = 0;
1952                 }
1953                 if (mask || ipt.error)
1954                         list_del_init(&poll->wait.entry);
1955                 else if (cancel)
1956                         WRITE_ONCE(poll->canceled, true);
1957                 else if (!poll->done) /* actually waiting for an event */
1958                         list_add_tail(&req->list, &ctx->cancel_list);
1959                 spin_unlock(&poll->head->lock);
1960         }
1961         if (mask) { /* no async, we'd stolen it */
1962                 ipt.error = 0;
1963                 io_poll_complete(ctx, req, mask);
1964         }
1965         spin_unlock_irq(&ctx->completion_lock);
1966
1967         if (mask) {
1968                 io_cqring_ev_posted(ctx);
1969                 io_put_req(req);
1970         }
1971         return ipt.error;
1972 }
1973
1974 static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
1975 {
1976         struct io_ring_ctx *ctx;
1977         struct io_kiocb *req, *prev;
1978         unsigned long flags;
1979
1980         req = container_of(timer, struct io_kiocb, timeout.timer);
1981         ctx = req->ctx;
1982         atomic_inc(&ctx->cq_timeouts);
1983
1984         spin_lock_irqsave(&ctx->completion_lock, flags);
1985         /*
1986          * Adjust the reqs sequence before the current one because it
1987          * will consume a slot in the cq_ring and the the cq_tail pointer
1988          * will be increased, otherwise other timeout reqs may return in
1989          * advance without waiting for enough wait_nr.
1990          */
1991         prev = req;
1992         list_for_each_entry_continue_reverse(prev, &ctx->timeout_list, list)
1993                 prev->sequence++;
1994         list_del(&req->list);
1995
1996         io_cqring_fill_event(ctx, req->user_data, -ETIME);
1997         io_commit_cqring(ctx);
1998         spin_unlock_irqrestore(&ctx->completion_lock, flags);
1999
2000         io_cqring_ev_posted(ctx);
2001
2002         io_put_req(req);
2003         return HRTIMER_NORESTART;
2004 }
2005
2006 static int io_timeout(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2007 {
2008         unsigned count;
2009         struct io_ring_ctx *ctx = req->ctx;
2010         struct list_head *entry;
2011         struct timespec64 ts;
2012         unsigned span = 0;
2013
2014         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2015                 return -EINVAL;
2016         if (sqe->flags || sqe->ioprio || sqe->buf_index || sqe->timeout_flags ||
2017             sqe->len != 1)
2018                 return -EINVAL;
2019
2020         if (get_timespec64(&ts, u64_to_user_ptr(sqe->addr)))
2021                 return -EFAULT;
2022
2023         req->flags |= REQ_F_TIMEOUT;
2024
2025         /*
2026          * sqe->off holds how many events that need to occur for this
2027          * timeout event to be satisfied. If it isn't set, then this is
2028          * a pure timeout request, sequence isn't used.
2029          */
2030         count = READ_ONCE(sqe->off);
2031         if (!count) {
2032                 req->flags |= REQ_F_TIMEOUT_NOSEQ;
2033                 spin_lock_irq(&ctx->completion_lock);
2034                 entry = ctx->timeout_list.prev;
2035                 goto add;
2036         }
2037
2038         req->sequence = ctx->cached_sq_head + count - 1;
2039         /* reuse it to store the count */
2040         req->submit.sequence = count;
2041
2042         /*
2043          * Insertion sort, ensuring the first entry in the list is always
2044          * the one we need first.
2045          */
2046         spin_lock_irq(&ctx->completion_lock);
2047         list_for_each_prev(entry, &ctx->timeout_list) {
2048                 struct io_kiocb *nxt = list_entry(entry, struct io_kiocb, list);
2049                 unsigned nxt_sq_head;
2050                 long long tmp, tmp_nxt;
2051
2052                 if (nxt->flags & REQ_F_TIMEOUT_NOSEQ)
2053                         continue;
2054
2055                 /*
2056                  * Since cached_sq_head + count - 1 can overflow, use type long
2057                  * long to store it.
2058                  */
2059                 tmp = (long long)ctx->cached_sq_head + count - 1;
2060                 nxt_sq_head = nxt->sequence - nxt->submit.sequence + 1;
2061                 tmp_nxt = (long long)nxt_sq_head + nxt->submit.sequence - 1;
2062
2063                 /*
2064                  * cached_sq_head may overflow, and it will never overflow twice
2065                  * once there is some timeout req still be valid.
2066                  */
2067                 if (ctx->cached_sq_head < nxt_sq_head)
2068                         tmp += UINT_MAX;
2069
2070                 if (tmp > tmp_nxt)
2071                         break;
2072
2073                 /*
2074                  * Sequence of reqs after the insert one and itself should
2075                  * be adjusted because each timeout req consumes a slot.
2076                  */
2077                 span++;
2078                 nxt->sequence++;
2079         }
2080         req->sequence -= span;
2081 add:
2082         list_add(&req->list, entry);
2083
2084         hrtimer_init(&req->timeout.timer, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
2085         req->timeout.timer.function = io_timeout_fn;
2086         hrtimer_start(&req->timeout.timer, timespec64_to_ktime(ts),
2087                         HRTIMER_MODE_REL);
2088         spin_unlock_irq(&ctx->completion_lock);
2089         return 0;
2090 }
2091
2092 static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req,
2093                         struct sqe_submit *s)
2094 {
2095         struct io_uring_sqe *sqe_copy;
2096
2097         if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list))
2098                 return 0;
2099
2100         sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
2101         if (!sqe_copy)
2102                 return -EAGAIN;
2103
2104         spin_lock_irq(&ctx->completion_lock);
2105         if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) {
2106                 spin_unlock_irq(&ctx->completion_lock);
2107                 kfree(sqe_copy);
2108                 return 0;
2109         }
2110
2111         memcpy(&req->submit, s, sizeof(*s));
2112         memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy));
2113         req->submit.sqe = sqe_copy;
2114
2115         INIT_WORK(&req->work, io_sq_wq_submit_work);
2116         list_add_tail(&req->list, &ctx->defer_list);
2117         spin_unlock_irq(&ctx->completion_lock);
2118         return -EIOCBQUEUED;
2119 }
2120
2121 static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
2122                            const struct sqe_submit *s, bool force_nonblock)
2123 {
2124         int ret;
2125
2126         req->user_data = READ_ONCE(s->sqe->user_data);
2127
2128         if (unlikely(s->index >= ctx->sq_entries))
2129                 return -EINVAL;
2130
2131         switch (req->submit.opcode) {
2132         case IORING_OP_NOP:
2133                 ret = io_nop(req, req->user_data);
2134                 break;
2135         case IORING_OP_READV:
2136                 if (unlikely(s->sqe->buf_index))
2137                         return -EINVAL;
2138                 ret = io_read(req, s, force_nonblock);
2139                 break;
2140         case IORING_OP_WRITEV:
2141                 if (unlikely(s->sqe->buf_index))
2142                         return -EINVAL;
2143                 ret = io_write(req, s, force_nonblock);
2144                 break;
2145         case IORING_OP_READ_FIXED:
2146                 ret = io_read(req, s, force_nonblock);
2147                 break;
2148         case IORING_OP_WRITE_FIXED:
2149                 ret = io_write(req, s, force_nonblock);
2150                 break;
2151         case IORING_OP_FSYNC:
2152                 ret = io_fsync(req, s->sqe, force_nonblock);
2153                 break;
2154         case IORING_OP_POLL_ADD:
2155                 ret = io_poll_add(req, s->sqe);
2156                 break;
2157         case IORING_OP_POLL_REMOVE:
2158                 ret = io_poll_remove(req, s->sqe);
2159                 break;
2160         case IORING_OP_SYNC_FILE_RANGE:
2161                 ret = io_sync_file_range(req, s->sqe, force_nonblock);
2162                 break;
2163         case IORING_OP_SENDMSG:
2164                 ret = io_sendmsg(req, s->sqe, force_nonblock);
2165                 break;
2166         case IORING_OP_RECVMSG:
2167                 ret = io_recvmsg(req, s->sqe, force_nonblock);
2168                 break;
2169         case IORING_OP_TIMEOUT:
2170                 ret = io_timeout(req, s->sqe);
2171                 break;
2172         default:
2173                 ret = -EINVAL;
2174                 break;
2175         }
2176
2177         if (ret)
2178                 return ret;
2179
2180         if (ctx->flags & IORING_SETUP_IOPOLL) {
2181                 if (req->result == -EAGAIN)
2182                         return -EAGAIN;
2183
2184                 /* workqueue context doesn't hold uring_lock, grab it now */
2185                 if (s->needs_lock)
2186                         mutex_lock(&ctx->uring_lock);
2187                 io_iopoll_req_issued(req);
2188                 if (s->needs_lock)
2189                         mutex_unlock(&ctx->uring_lock);
2190         }
2191
2192         return 0;
2193 }
2194
2195 static struct async_list *io_async_list_from_req(struct io_ring_ctx *ctx,
2196                                                  struct io_kiocb *req)
2197 {
2198         switch (req->submit.opcode) {
2199         case IORING_OP_READV:
2200         case IORING_OP_READ_FIXED:
2201                 return &ctx->pending_async[READ];
2202         case IORING_OP_WRITEV:
2203         case IORING_OP_WRITE_FIXED:
2204                 return &ctx->pending_async[WRITE];
2205         default:
2206                 return NULL;
2207         }
2208 }
2209
2210 static inline bool io_req_needs_user(struct io_kiocb *req)
2211 {
2212         return !(req->submit.opcode == IORING_OP_READ_FIXED ||
2213                 req->submit.opcode == IORING_OP_WRITE_FIXED);
2214 }
2215
2216 static void io_sq_wq_submit_work(struct work_struct *work)
2217 {
2218         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
2219         struct fs_struct *old_fs_struct = current->fs;
2220         struct io_ring_ctx *ctx = req->ctx;
2221         struct mm_struct *cur_mm = NULL;
2222         struct async_list *async_list;
2223         const struct cred *old_cred;
2224         LIST_HEAD(req_list);
2225         mm_segment_t old_fs;
2226         int ret;
2227
2228         old_cred = override_creds(ctx->creds);
2229         async_list = io_async_list_from_req(ctx, req);
2230
2231         allow_kernel_signal(SIGINT);
2232 restart:
2233         do {
2234                 struct sqe_submit *s = &req->submit;
2235                 const struct io_uring_sqe *sqe = s->sqe;
2236                 unsigned int flags = req->flags;
2237
2238                 /* Ensure we clear previously set non-block flag */
2239                 req->rw.ki_flags &= ~IOCB_NOWAIT;
2240
2241                 if ((req->fs && req->fs != current->fs) ||
2242                     (!req->fs && current->fs != old_fs_struct)) {
2243                         task_lock(current);
2244                         if (req->fs)
2245                                 current->fs = req->fs;
2246                         else
2247                                 current->fs = old_fs_struct;
2248                         task_unlock(current);
2249                 }
2250
2251                 ret = 0;
2252                 if (io_req_needs_user(req) && !cur_mm) {
2253                         if (!mmget_not_zero(ctx->sqo_mm)) {
2254                                 ret = -EFAULT;
2255                                 goto end_req;
2256                         } else {
2257                                 cur_mm = ctx->sqo_mm;
2258                                 use_mm(cur_mm);
2259                                 old_fs = get_fs();
2260                                 set_fs(USER_DS);
2261                         }
2262                 }
2263
2264                 if (!ret) {
2265                         req->work_task = current;
2266
2267                         /*
2268                          * Pairs with the smp_store_mb() (B) in
2269                          * io_cancel_async_work().
2270                          */
2271                         smp_mb(); /* A */
2272                         if (req->flags & REQ_F_CANCEL) {
2273                                 ret = -ECANCELED;
2274                                 goto end_req;
2275                         }
2276
2277                         s->has_user = cur_mm != NULL;
2278                         s->needs_lock = true;
2279                         do {
2280                                 ret = __io_submit_sqe(ctx, req, s, false);
2281                                 /*
2282                                  * We can get EAGAIN for polled IO even though
2283                                  * we're forcing a sync submission from here,
2284                                  * since we can't wait for request slots on the
2285                                  * block side.
2286                                  */
2287                                 if (ret != -EAGAIN)
2288                                         break;
2289                                 cond_resched();
2290                         } while (1);
2291                 }
2292 end_req:
2293                 spin_lock_irq(&ctx->task_lock);
2294                 list_del_init(&req->task_list);
2295                 spin_unlock_irq(&ctx->task_lock);
2296
2297                 /* drop submission reference */
2298                 io_put_req(req);
2299
2300                 if (ret) {
2301                         io_cqring_add_event(ctx, sqe->user_data, ret);
2302                         io_put_req(req);
2303                 }
2304
2305                 /* async context always use a copy of the sqe */
2306                 kfree(sqe);
2307
2308                 /* req from defer and link list needn't decrease async cnt */
2309                 if (flags & (REQ_F_IO_DRAINED | REQ_F_LINK_DONE))
2310                         goto out;
2311
2312                 if (!async_list)
2313                         break;
2314                 if (!list_empty(&req_list)) {
2315                         req = list_first_entry(&req_list, struct io_kiocb,
2316                                                 list);
2317                         list_del(&req->list);
2318                         continue;
2319                 }
2320                 if (list_empty(&async_list->list))
2321                         break;
2322
2323                 req = NULL;
2324                 spin_lock(&async_list->lock);
2325                 if (list_empty(&async_list->list)) {
2326                         spin_unlock(&async_list->lock);
2327                         break;
2328                 }
2329                 list_splice_init(&async_list->list, &req_list);
2330                 spin_unlock(&async_list->lock);
2331
2332                 req = list_first_entry(&req_list, struct io_kiocb, list);
2333                 list_del(&req->list);
2334         } while (req);
2335
2336         /*
2337          * Rare case of racing with a submitter. If we find the count has
2338          * dropped to zero AND we have pending work items, then restart
2339          * the processing. This is a tiny race window.
2340          */
2341         if (async_list) {
2342                 ret = atomic_dec_return(&async_list->cnt);
2343                 while (!ret && !list_empty(&async_list->list)) {
2344                         spin_lock(&async_list->lock);
2345                         atomic_inc(&async_list->cnt);
2346                         list_splice_init(&async_list->list, &req_list);
2347                         spin_unlock(&async_list->lock);
2348
2349                         if (!list_empty(&req_list)) {
2350                                 req = list_first_entry(&req_list,
2351                                                         struct io_kiocb, list);
2352                                 list_del(&req->list);
2353                                 goto restart;
2354                         }
2355                         ret = atomic_dec_return(&async_list->cnt);
2356                 }
2357         }
2358
2359 out:
2360         disallow_signal(SIGINT);
2361         if (cur_mm) {
2362                 set_fs(old_fs);
2363                 unuse_mm(cur_mm);
2364                 mmput(cur_mm);
2365         }
2366         revert_creds(old_cred);
2367         if (old_fs_struct != current->fs) {
2368                 task_lock(current);
2369                 current->fs = old_fs_struct;
2370                 task_unlock(current);
2371         }
2372 }
2373
2374 /*
2375  * See if we can piggy back onto previously submitted work, that is still
2376  * running. We currently only allow this if the new request is sequential
2377  * to the previous one we punted.
2378  */
2379 static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
2380 {
2381         bool ret;
2382
2383         if (!list)
2384                 return false;
2385         if (!(req->flags & REQ_F_SEQ_PREV))
2386                 return false;
2387         if (!atomic_read(&list->cnt))
2388                 return false;
2389
2390         ret = true;
2391         spin_lock(&list->lock);
2392         list_add_tail(&req->list, &list->list);
2393         /*
2394          * Ensure we see a simultaneous modification from io_sq_wq_submit_work()
2395          */
2396         smp_mb();
2397         if (!atomic_read(&list->cnt)) {
2398                 list_del_init(&req->list);
2399                 ret = false;
2400         }
2401
2402         if (ret) {
2403                 struct io_ring_ctx *ctx = req->ctx;
2404
2405                 req->files = current->files;
2406
2407                 spin_lock_irq(&ctx->task_lock);
2408                 list_add(&req->task_list, &ctx->task_list);
2409                 req->work_task = NULL;
2410                 spin_unlock_irq(&ctx->task_lock);
2411         }
2412         spin_unlock(&list->lock);
2413         return ret;
2414 }
2415
2416 static bool io_op_needs_file(struct io_kiocb *req)
2417 {
2418         switch (req->submit.opcode) {
2419         case IORING_OP_NOP:
2420         case IORING_OP_POLL_REMOVE:
2421         case IORING_OP_TIMEOUT:
2422                 return false;
2423         default:
2424                 return true;
2425         }
2426 }
2427
2428 static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
2429                            struct io_submit_state *state, struct io_kiocb *req)
2430 {
2431         unsigned flags;
2432         int fd;
2433
2434         flags = READ_ONCE(s->sqe->flags);
2435         fd = READ_ONCE(s->sqe->fd);
2436
2437         if (flags & IOSQE_IO_DRAIN)
2438                 req->flags |= REQ_F_IO_DRAIN;
2439         /*
2440          * All io need record the previous position, if LINK vs DARIN,
2441          * it can be used to mark the position of the first IO in the
2442          * link list.
2443          */
2444         req->sequence = s->sequence;
2445
2446         if (!io_op_needs_file(req))
2447                 return 0;
2448
2449         if (flags & IOSQE_FIXED_FILE) {
2450                 if (unlikely(!ctx->user_files ||
2451                     (unsigned) fd >= ctx->nr_user_files))
2452                         return -EBADF;
2453                 req->file = ctx->user_files[fd];
2454                 req->flags |= REQ_F_FIXED_FILE;
2455         } else {
2456                 if (s->needs_fixed_file)
2457                         return -EBADF;
2458                 req->file = io_file_get(state, fd);
2459                 if (unlikely(!req->file))
2460                         return -EBADF;
2461         }
2462
2463         return 0;
2464 }
2465
2466 static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
2467                         struct sqe_submit *s)
2468 {
2469         int ret;
2470
2471         ret = __io_submit_sqe(ctx, req, s, true);
2472
2473         /*
2474          * We async punt it if the file wasn't marked NOWAIT, or if the file
2475          * doesn't support non-blocking read/write attempts
2476          */
2477         if (ret == -EAGAIN && (!(req->flags & REQ_F_NOWAIT) ||
2478             (req->flags & REQ_F_MUST_PUNT))) {
2479                 struct io_uring_sqe *sqe_copy;
2480
2481                 sqe_copy = kmemdup(s->sqe, sizeof(*sqe_copy), GFP_KERNEL);
2482                 if (sqe_copy) {
2483                         struct async_list *list;
2484
2485                         s->sqe = sqe_copy;
2486                         memcpy(&req->submit, s, sizeof(*s));
2487                         list = io_async_list_from_req(ctx, req);
2488                         if (!io_add_to_prev_work(list, req)) {
2489                                 if (list)
2490                                         atomic_inc(&list->cnt);
2491                                 INIT_WORK(&req->work, io_sq_wq_submit_work);
2492                                 io_queue_async_work(ctx, req);
2493                         }
2494
2495                         /*
2496                          * Queued up for async execution, worker will release
2497                          * submit reference when the iocb is actually submitted.
2498                          */
2499                         return 0;
2500                 }
2501         }
2502
2503         /* drop submission reference */
2504         io_put_req(req);
2505
2506         /* and drop final reference, if we failed */
2507         if (ret) {
2508                 io_cqring_add_event(ctx, req->user_data, ret);
2509                 if (req->flags & REQ_F_LINK)
2510                         req->flags |= REQ_F_FAIL_LINK;
2511                 io_put_req(req);
2512         }
2513
2514         return ret;
2515 }
2516
2517 static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
2518                         struct sqe_submit *s)
2519 {
2520         int ret;
2521
2522         ret = io_req_defer(ctx, req, s);
2523         if (ret) {
2524                 if (ret != -EIOCBQUEUED) {
2525                         io_free_req(req);
2526                         io_cqring_add_event(ctx, s->sqe->user_data, ret);
2527                 }
2528                 return 0;
2529         }
2530
2531         return __io_queue_sqe(ctx, req, s);
2532 }
2533
2534 static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req,
2535                               struct sqe_submit *s, struct io_kiocb *shadow)
2536 {
2537         int ret;
2538         int need_submit = false;
2539
2540         if (!shadow)
2541                 return io_queue_sqe(ctx, req, s);
2542
2543         /*
2544          * Mark the first IO in link list as DRAIN, let all the following
2545          * IOs enter the defer list. all IO needs to be completed before link
2546          * list.
2547          */
2548         req->flags |= REQ_F_IO_DRAIN;
2549         ret = io_req_defer(ctx, req, s);
2550         if (ret) {
2551                 if (ret != -EIOCBQUEUED) {
2552                         io_free_req(req);
2553                         __io_free_req(shadow);
2554                         io_cqring_add_event(ctx, s->sqe->user_data, ret);
2555                         return 0;
2556                 }
2557         } else {
2558                 /*
2559                  * If ret == 0 means that all IOs in front of link io are
2560                  * running done. let's queue link head.
2561                  */
2562                 need_submit = true;
2563         }
2564
2565         /* Insert shadow req to defer_list, blocking next IOs */
2566         spin_lock_irq(&ctx->completion_lock);
2567         list_add_tail(&shadow->list, &ctx->defer_list);
2568         spin_unlock_irq(&ctx->completion_lock);
2569
2570         if (need_submit)
2571                 return __io_queue_sqe(ctx, req, s);
2572
2573         return 0;
2574 }
2575
2576 #define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK)
2577
2578 static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
2579                           struct io_submit_state *state, struct io_kiocb **link)
2580 {
2581         struct io_uring_sqe *sqe_copy;
2582         struct io_kiocb *req;
2583         int ret;
2584
2585         /* enforce forwards compatibility on users */
2586         if (unlikely(s->sqe->flags & ~SQE_VALID_FLAGS)) {
2587                 ret = -EINVAL;
2588                 goto err;
2589         }
2590
2591         req = io_get_req(ctx, state);
2592         if (unlikely(!req)) {
2593                 ret = -EAGAIN;
2594                 goto err;
2595         }
2596
2597         memcpy(&req->submit, s, sizeof(*s));
2598         ret = io_req_set_file(ctx, s, state, req);
2599         if (unlikely(ret)) {
2600 err_req:
2601                 io_free_req(req);
2602 err:
2603                 io_cqring_add_event(ctx, s->sqe->user_data, ret);
2604                 return;
2605         }
2606
2607         req->user_data = s->sqe->user_data;
2608
2609 #if defined(CONFIG_NET)
2610         switch (req->submit.opcode) {
2611         case IORING_OP_SENDMSG:
2612         case IORING_OP_RECVMSG:
2613                 spin_lock(&current->fs->lock);
2614                 if (!current->fs->in_exec) {
2615                         req->fs = current->fs;
2616                         req->fs->users++;
2617                 }
2618                 spin_unlock(&current->fs->lock);
2619                 if (!req->fs) {
2620                         ret = -EAGAIN;
2621                         goto err_req;
2622                 }
2623         }
2624 #endif
2625
2626         /*
2627          * If we already have a head request, queue this one for async
2628          * submittal once the head completes. If we don't have a head but
2629          * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
2630          * submitted sync once the chain is complete. If none of those
2631          * conditions are true (normal request), then just queue it.
2632          */
2633         if (*link) {
2634                 struct io_kiocb *prev = *link;
2635
2636                 sqe_copy = kmemdup(s->sqe, sizeof(*sqe_copy), GFP_KERNEL);
2637                 if (!sqe_copy) {
2638                         ret = -EAGAIN;
2639                         goto err_req;
2640                 }
2641
2642                 s->sqe = sqe_copy;
2643                 memcpy(&req->submit, s, sizeof(*s));
2644                 list_add_tail(&req->list, &prev->link_list);
2645         } else if (s->sqe->flags & IOSQE_IO_LINK) {
2646                 req->flags |= REQ_F_LINK;
2647
2648                 memcpy(&req->submit, s, sizeof(*s));
2649                 INIT_LIST_HEAD(&req->link_list);
2650                 *link = req;
2651         } else {
2652                 io_queue_sqe(ctx, req, s);
2653         }
2654 }
2655
2656 /*
2657  * Batched submission is done, ensure local IO is flushed out.
2658  */
2659 static void io_submit_state_end(struct io_submit_state *state)
2660 {
2661         blk_finish_plug(&state->plug);
2662         io_file_put(state);
2663         if (state->free_reqs)
2664                 kmem_cache_free_bulk(req_cachep, state->free_reqs,
2665                                         &state->reqs[state->cur_req]);
2666 }
2667
2668 /*
2669  * Start submission side cache.
2670  */
2671 static void io_submit_state_start(struct io_submit_state *state,
2672                                   struct io_ring_ctx *ctx, unsigned max_ios)
2673 {
2674         blk_start_plug(&state->plug);
2675         state->free_reqs = 0;
2676         state->file = NULL;
2677         state->ios_left = max_ios;
2678 }
2679
2680 static void io_commit_sqring(struct io_ring_ctx *ctx)
2681 {
2682         struct io_rings *rings = ctx->rings;
2683
2684         if (ctx->cached_sq_head != READ_ONCE(rings->sq.head)) {
2685                 /*
2686                  * Ensure any loads from the SQEs are done at this point,
2687                  * since once we write the new head, the application could
2688                  * write new data to them.
2689                  */
2690                 smp_store_release(&rings->sq.head, ctx->cached_sq_head);
2691         }
2692 }
2693
2694 /*
2695  * Fetch an sqe, if one is available. Note that s->sqe will point to memory
2696  * that is mapped by userspace. This means that care needs to be taken to
2697  * ensure that reads are stable, as we cannot rely on userspace always
2698  * being a good citizen. If members of the sqe are validated and then later
2699  * used, it's important that those reads are done through READ_ONCE() to
2700  * prevent a re-load down the line.
2701  */
2702 static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
2703 {
2704         struct io_rings *rings = ctx->rings;
2705         u32 *sq_array = ctx->sq_array;
2706         unsigned head;
2707
2708         /*
2709          * The cached sq head (or cq tail) serves two purposes:
2710          *
2711          * 1) allows us to batch the cost of updating the user visible
2712          *    head updates.
2713          * 2) allows the kernel side to track the head on its own, even
2714          *    though the application is the one updating it.
2715          */
2716         head = ctx->cached_sq_head;
2717         /* make sure SQ entry isn't read before tail */
2718         if (head == smp_load_acquire(&rings->sq.tail))
2719                 return false;
2720
2721         head = READ_ONCE(sq_array[head & ctx->sq_mask]);
2722         if (head < ctx->sq_entries) {
2723                 s->index = head;
2724                 s->sqe = &ctx->sq_sqes[head];
2725                 s->opcode = READ_ONCE(s->sqe->opcode);
2726                 s->sequence = ctx->cached_sq_head;
2727                 ctx->cached_sq_head++;
2728                 return true;
2729         }
2730
2731         /* drop invalid entries */
2732         ctx->cached_sq_head++;
2733         ctx->cached_sq_dropped++;
2734         WRITE_ONCE(rings->sq_dropped, ctx->cached_sq_dropped);
2735         return false;
2736 }
2737
2738 static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
2739                           bool has_user, bool mm_fault)
2740 {
2741         struct io_submit_state state, *statep = NULL;
2742         struct io_kiocb *link = NULL;
2743         struct io_kiocb *shadow_req = NULL;
2744         bool prev_was_link = false;
2745         int i, submitted = 0;
2746
2747         if (nr > IO_PLUG_THRESHOLD) {
2748                 io_submit_state_start(&state, ctx, nr);
2749                 statep = &state;
2750         }
2751
2752         for (i = 0; i < nr; i++) {
2753                 struct sqe_submit s;
2754
2755                 if (!io_get_sqring(ctx, &s))
2756                         break;
2757
2758                 /*
2759                  * If previous wasn't linked and we have a linked command,
2760                  * that's the end of the chain. Submit the previous link.
2761                  */
2762                 if (!prev_was_link && link) {
2763                         io_queue_link_head(ctx, link, &link->submit, shadow_req);
2764                         link = NULL;
2765                         shadow_req = NULL;
2766                 }
2767                 prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0;
2768
2769                 if (link && (s.sqe->flags & IOSQE_IO_DRAIN)) {
2770                         if (!shadow_req) {
2771                                 shadow_req = io_get_req(ctx, NULL);
2772                                 if (unlikely(!shadow_req))
2773                                         goto out;
2774                                 shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN);
2775                                 refcount_dec(&shadow_req->refs);
2776                         }
2777                         shadow_req->sequence = s.sequence;
2778                 }
2779
2780 out:
2781                 if (unlikely(mm_fault)) {
2782                         io_cqring_add_event(ctx, s.sqe->user_data,
2783                                                 -EFAULT);
2784                 } else {
2785                         s.has_user = has_user;
2786                         s.needs_lock = true;
2787                         s.needs_fixed_file = true;
2788                         io_submit_sqe(ctx, &s, statep, &link);
2789                         submitted++;
2790                 }
2791         }
2792
2793         if (link)
2794                 io_queue_link_head(ctx, link, &link->submit, shadow_req);
2795         if (statep)
2796                 io_submit_state_end(&state);
2797
2798         return submitted;
2799 }
2800
2801 static int io_sq_thread(void *data)
2802 {
2803         struct io_ring_ctx *ctx = data;
2804         struct mm_struct *cur_mm = NULL;
2805         const struct cred *old_cred;
2806         mm_segment_t old_fs;
2807         DEFINE_WAIT(wait);
2808         unsigned inflight;
2809         unsigned long timeout;
2810
2811         complete(&ctx->sqo_thread_started);
2812
2813         old_fs = get_fs();
2814         set_fs(USER_DS);
2815         old_cred = override_creds(ctx->creds);
2816
2817         timeout = inflight = 0;
2818         while (!kthread_should_park()) {
2819                 bool mm_fault = false;
2820                 unsigned int to_submit;
2821
2822                 if (inflight) {
2823                         unsigned nr_events = 0;
2824
2825                         if (ctx->flags & IORING_SETUP_IOPOLL) {
2826                                 /*
2827                                  * inflight is the count of the maximum possible
2828                                  * entries we submitted, but it can be smaller
2829                                  * if we dropped some of them. If we don't have
2830                                  * poll entries available, then we know that we
2831                                  * have nothing left to poll for. Reset the
2832                                  * inflight count to zero in that case.
2833                                  */
2834                                 mutex_lock(&ctx->uring_lock);
2835                                 if (!list_empty(&ctx->poll_list))
2836                                         io_iopoll_getevents(ctx, &nr_events, 0);
2837                                 else
2838                                         inflight = 0;
2839                                 mutex_unlock(&ctx->uring_lock);
2840                         } else {
2841                                 /*
2842                                  * Normal IO, just pretend everything completed.
2843                                  * We don't have to poll completions for that.
2844                                  */
2845                                 nr_events = inflight;
2846                         }
2847
2848                         inflight -= nr_events;
2849                         if (!inflight)
2850                                 timeout = jiffies + ctx->sq_thread_idle;
2851                 }
2852
2853                 to_submit = io_sqring_entries(ctx);
2854                 if (!to_submit) {
2855                         /*
2856                          * Drop cur_mm before scheduling, we can't hold it for
2857                          * long periods (or over schedule()). Do this before
2858                          * adding ourselves to the waitqueue, as the unuse/drop
2859                          * may sleep.
2860                          */
2861                         if (cur_mm) {
2862                                 unuse_mm(cur_mm);
2863                                 mmput(cur_mm);
2864                                 cur_mm = NULL;
2865                         }
2866
2867                         /*
2868                          * We're polling. If we're within the defined idle
2869                          * period, then let us spin without work before going
2870                          * to sleep.
2871                          */
2872                         if (inflight || !time_after(jiffies, timeout)) {
2873                                 cond_resched();
2874                                 continue;
2875                         }
2876
2877                         prepare_to_wait(&ctx->sqo_wait, &wait,
2878                                                 TASK_INTERRUPTIBLE);
2879
2880                         /* Tell userspace we may need a wakeup call */
2881                         ctx->rings->sq_flags |= IORING_SQ_NEED_WAKEUP;
2882                         /* make sure to read SQ tail after writing flags */
2883                         smp_mb();
2884
2885                         to_submit = io_sqring_entries(ctx);
2886                         if (!to_submit) {
2887                                 if (kthread_should_park()) {
2888                                         finish_wait(&ctx->sqo_wait, &wait);
2889                                         break;
2890                                 }
2891                                 if (signal_pending(current))
2892                                         flush_signals(current);
2893                                 schedule();
2894                                 finish_wait(&ctx->sqo_wait, &wait);
2895
2896                                 ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
2897                                 continue;
2898                         }
2899                         finish_wait(&ctx->sqo_wait, &wait);
2900
2901                         ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
2902                 }
2903
2904                 /* Unless all new commands are FIXED regions, grab mm */
2905                 if (!cur_mm) {
2906                         mm_fault = !mmget_not_zero(ctx->sqo_mm);
2907                         if (!mm_fault) {
2908                                 use_mm(ctx->sqo_mm);
2909                                 cur_mm = ctx->sqo_mm;
2910                         }
2911                 }
2912
2913                 to_submit = min(to_submit, ctx->sq_entries);
2914                 inflight += io_submit_sqes(ctx, to_submit, cur_mm != NULL,
2915                                            mm_fault);
2916
2917                 /* Commit SQ ring head once we've consumed all SQEs */
2918                 io_commit_sqring(ctx);
2919         }
2920
2921         set_fs(old_fs);
2922         if (cur_mm) {
2923                 unuse_mm(cur_mm);
2924                 mmput(cur_mm);
2925         }
2926         revert_creds(old_cred);
2927
2928         kthread_parkme();
2929
2930         return 0;
2931 }
2932
2933 static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
2934 {
2935         struct io_submit_state state, *statep = NULL;
2936         struct io_kiocb *link = NULL;
2937         struct io_kiocb *shadow_req = NULL;
2938         bool prev_was_link = false;
2939         int i, submit = 0;
2940
2941         if (to_submit > IO_PLUG_THRESHOLD) {
2942                 io_submit_state_start(&state, ctx, to_submit);
2943                 statep = &state;
2944         }
2945
2946         for (i = 0; i < to_submit; i++) {
2947                 struct sqe_submit s;
2948
2949                 if (!io_get_sqring(ctx, &s))
2950                         break;
2951
2952                 /*
2953                  * If previous wasn't linked and we have a linked command,
2954                  * that's the end of the chain. Submit the previous link.
2955                  */
2956                 if (!prev_was_link && link) {
2957                         io_queue_link_head(ctx, link, &link->submit, shadow_req);
2958                         link = NULL;
2959                         shadow_req = NULL;
2960                 }
2961                 prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0;
2962
2963                 if (link && (s.sqe->flags & IOSQE_IO_DRAIN)) {
2964                         if (!shadow_req) {
2965                                 shadow_req = io_get_req(ctx, NULL);
2966                                 if (unlikely(!shadow_req))
2967                                         goto out;
2968                                 shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN);
2969                                 refcount_dec(&shadow_req->refs);
2970                         }
2971                         shadow_req->sequence = s.sequence;
2972                 }
2973
2974 out:
2975                 s.has_user = true;
2976                 s.needs_lock = false;
2977                 s.needs_fixed_file = false;
2978                 submit++;
2979                 io_submit_sqe(ctx, &s, statep, &link);
2980         }
2981
2982         if (link)
2983                 io_queue_link_head(ctx, link, &link->submit, shadow_req);
2984         if (statep)
2985                 io_submit_state_end(statep);
2986
2987         io_commit_sqring(ctx);
2988
2989         return submit;
2990 }
2991
2992 struct io_wait_queue {
2993         struct wait_queue_entry wq;
2994         struct io_ring_ctx *ctx;
2995         unsigned to_wait;
2996         unsigned nr_timeouts;
2997 };
2998
2999 static inline bool io_should_wake(struct io_wait_queue *iowq)
3000 {
3001         struct io_ring_ctx *ctx = iowq->ctx;
3002
3003         /*
3004          * Wake up if we have enough events, or if a timeout occured since we
3005          * started waiting. For timeouts, we always want to return to userspace,
3006          * regardless of event count.
3007          */
3008         return io_cqring_events(ctx->rings) >= iowq->to_wait ||
3009                         atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
3010 }
3011
3012 static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
3013                             int wake_flags, void *key)
3014 {
3015         struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
3016                                                         wq);
3017
3018         if (!io_should_wake(iowq))
3019                 return -1;
3020
3021         return autoremove_wake_function(curr, mode, wake_flags, key);
3022 }
3023
3024 /*
3025  * Wait until events become available, if we don't already have some. The
3026  * application must reap them itself, as they reside on the shared cq ring.
3027  */
3028 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
3029                           const sigset_t __user *sig, size_t sigsz)
3030 {
3031         struct io_wait_queue iowq = {
3032                 .wq = {
3033                         .private        = current,
3034                         .func           = io_wake_function,
3035                         .entry          = LIST_HEAD_INIT(iowq.wq.entry),
3036                 },
3037                 .ctx            = ctx,
3038                 .to_wait        = min_events,
3039         };
3040         struct io_rings *rings = ctx->rings;
3041         int ret;
3042
3043         if (io_cqring_events(rings) >= min_events)
3044                 return 0;
3045
3046         if (sig) {
3047 #ifdef CONFIG_COMPAT
3048                 if (in_compat_syscall())
3049                         ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
3050                                                       sigsz);
3051                 else
3052 #endif
3053                         ret = set_user_sigmask(sig, sigsz);
3054
3055                 if (ret)
3056                         return ret;
3057         }
3058
3059         ret = 0;
3060         iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
3061         do {
3062                 prepare_to_wait_exclusive(&ctx->wait, &iowq.wq,
3063                                                 TASK_INTERRUPTIBLE);
3064                 if (io_should_wake(&iowq))
3065                         break;
3066                 schedule();
3067                 if (signal_pending(current)) {
3068                         ret = -ERESTARTSYS;
3069                         break;
3070                 }
3071         } while (1);
3072         finish_wait(&ctx->wait, &iowq.wq);
3073
3074         restore_saved_sigmask_unless(ret == -ERESTARTSYS);
3075         if (ret == -ERESTARTSYS)
3076                 ret = -EINTR;
3077
3078         return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
3079 }
3080
3081 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
3082 {
3083 #if defined(CONFIG_UNIX)
3084         if (ctx->ring_sock) {
3085                 struct sock *sock = ctx->ring_sock->sk;
3086                 struct sk_buff *skb;
3087
3088                 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
3089                         kfree_skb(skb);
3090         }
3091 #else
3092         int i;
3093
3094         for (i = 0; i < ctx->nr_user_files; i++)
3095                 fput(ctx->user_files[i]);
3096 #endif
3097 }
3098
3099 static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
3100 {
3101         if (!ctx->user_files)
3102                 return -ENXIO;
3103
3104         __io_sqe_files_unregister(ctx);
3105         kfree(ctx->user_files);
3106         ctx->user_files = NULL;
3107         ctx->nr_user_files = 0;
3108         return 0;
3109 }
3110
3111 static void io_sq_thread_stop(struct io_ring_ctx *ctx)
3112 {
3113         if (ctx->sqo_thread) {
3114                 wait_for_completion(&ctx->sqo_thread_started);
3115                 /*
3116                  * The park is a bit of a work-around, without it we get
3117                  * warning spews on shutdown with SQPOLL set and affinity
3118                  * set to a single CPU.
3119                  */
3120                 kthread_park(ctx->sqo_thread);
3121                 kthread_stop(ctx->sqo_thread);
3122                 ctx->sqo_thread = NULL;
3123         }
3124 }
3125
3126 static void io_finish_async(struct io_ring_ctx *ctx)
3127 {
3128         int i;
3129
3130         io_sq_thread_stop(ctx);
3131
3132         for (i = 0; i < ARRAY_SIZE(ctx->sqo_wq); i++) {
3133                 if (ctx->sqo_wq[i]) {
3134                         destroy_workqueue(ctx->sqo_wq[i]);
3135                         ctx->sqo_wq[i] = NULL;
3136                 }
3137         }
3138 }
3139
3140 #if defined(CONFIG_UNIX)
3141 static void io_destruct_skb(struct sk_buff *skb)
3142 {
3143         struct io_ring_ctx *ctx = skb->sk->sk_user_data;
3144         int i;
3145
3146         for (i = 0; i < ARRAY_SIZE(ctx->sqo_wq); i++)
3147                 if (ctx->sqo_wq[i])
3148                         flush_workqueue(ctx->sqo_wq[i]);
3149
3150         unix_destruct_scm(skb);
3151 }
3152
3153 /*
3154  * Ensure the UNIX gc is aware of our file set, so we are certain that
3155  * the io_uring can be safely unregistered on process exit, even if we have
3156  * loops in the file referencing.
3157  */
3158 static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
3159 {
3160         struct sock *sk = ctx->ring_sock->sk;
3161         struct scm_fp_list *fpl;
3162         struct sk_buff *skb;
3163         int i;
3164
3165         fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
3166         if (!fpl)
3167                 return -ENOMEM;
3168
3169         skb = alloc_skb(0, GFP_KERNEL);
3170         if (!skb) {
3171                 kfree(fpl);
3172                 return -ENOMEM;
3173         }
3174
3175         skb->sk = sk;
3176         skb->scm_io_uring = 1;
3177         skb->destructor = io_destruct_skb;
3178
3179         fpl->user = get_uid(ctx->user);
3180         for (i = 0; i < nr; i++) {
3181                 fpl->fp[i] = get_file(ctx->user_files[i + offset]);
3182                 unix_inflight(fpl->user, fpl->fp[i]);
3183         }
3184
3185         fpl->max = fpl->count = nr;
3186         UNIXCB(skb).fp = fpl;
3187         refcount_add(skb->truesize, &sk->sk_wmem_alloc);
3188         skb_queue_head(&sk->sk_receive_queue, skb);
3189
3190         for (i = 0; i < nr; i++)
3191                 fput(fpl->fp[i]);
3192
3193         return 0;
3194 }
3195
3196 /*
3197  * If UNIX sockets are enabled, fd passing can cause a reference cycle which
3198  * causes regular reference counting to break down. We rely on the UNIX
3199  * garbage collection to take care of this problem for us.
3200  */
3201 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
3202 {
3203         unsigned left, total;
3204         int ret = 0;
3205
3206         total = 0;
3207         left = ctx->nr_user_files;
3208         while (left) {
3209                 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
3210
3211                 ret = __io_sqe_files_scm(ctx, this_files, total);
3212                 if (ret)
3213                         break;
3214                 left -= this_files;
3215                 total += this_files;
3216         }
3217
3218         if (!ret)
3219                 return 0;
3220
3221         while (total < ctx->nr_user_files) {
3222                 fput(ctx->user_files[total]);
3223                 total++;
3224         }
3225
3226         return ret;
3227 }
3228 #else
3229 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
3230 {
3231         return 0;
3232 }
3233 #endif
3234
3235 static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
3236                                  unsigned nr_args)
3237 {
3238         __s32 __user *fds = (__s32 __user *) arg;
3239         int fd, ret = 0;
3240         unsigned i;
3241
3242         if (ctx->user_files)
3243                 return -EBUSY;
3244         if (!nr_args)
3245                 return -EINVAL;
3246         if (nr_args > IORING_MAX_FIXED_FILES)
3247                 return -EMFILE;
3248
3249         ctx->user_files = kcalloc(nr_args, sizeof(struct file *), GFP_KERNEL);
3250         if (!ctx->user_files)
3251                 return -ENOMEM;
3252
3253         for (i = 0; i < nr_args; i++) {
3254                 ret = -EFAULT;
3255                 if (copy_from_user(&fd, &fds[i], sizeof(fd)))
3256                         break;
3257
3258                 ctx->user_files[i] = fget(fd);
3259
3260                 ret = -EBADF;
3261                 if (!ctx->user_files[i])
3262                         break;
3263                 /*
3264                  * Don't allow io_uring instances to be registered. If UNIX
3265                  * isn't enabled, then this causes a reference cycle and this
3266                  * instance can never get freed. If UNIX is enabled we'll
3267                  * handle it just fine, but there's still no point in allowing
3268                  * a ring fd as it doesn't support regular read/write anyway.
3269                  */
3270                 if (ctx->user_files[i]->f_op == &io_uring_fops) {
3271                         fput(ctx->user_files[i]);
3272                         break;
3273                 }
3274                 ctx->nr_user_files++;
3275                 ret = 0;
3276         }
3277
3278         if (ret) {
3279                 for (i = 0; i < ctx->nr_user_files; i++)
3280                         fput(ctx->user_files[i]);
3281
3282                 kfree(ctx->user_files);
3283                 ctx->user_files = NULL;
3284                 ctx->nr_user_files = 0;
3285                 return ret;
3286         }
3287
3288         ret = io_sqe_files_scm(ctx);
3289         if (ret)
3290                 io_sqe_files_unregister(ctx);
3291
3292         return ret;
3293 }
3294
3295 static int io_sq_offload_start(struct io_ring_ctx *ctx,
3296                                struct io_uring_params *p)
3297 {
3298         int ret;
3299
3300         mmgrab(current->mm);
3301         ctx->sqo_mm = current->mm;
3302
3303         if (ctx->flags & IORING_SETUP_SQPOLL) {
3304                 ret = -EPERM;
3305                 if (!capable(CAP_SYS_ADMIN))
3306                         goto err;
3307
3308                 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
3309                 if (!ctx->sq_thread_idle)
3310                         ctx->sq_thread_idle = HZ;
3311
3312                 if (p->flags & IORING_SETUP_SQ_AFF) {
3313                         int cpu = p->sq_thread_cpu;
3314
3315                         ret = -EINVAL;
3316                         if (cpu >= nr_cpu_ids)
3317                                 goto err;
3318                         if (!cpu_online(cpu))
3319                                 goto err;
3320
3321                         ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
3322                                                         ctx, cpu,
3323                                                         "io_uring-sq");
3324                 } else {
3325                         ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
3326                                                         "io_uring-sq");
3327                 }
3328                 if (IS_ERR(ctx->sqo_thread)) {
3329                         ret = PTR_ERR(ctx->sqo_thread);
3330                         ctx->sqo_thread = NULL;
3331                         goto err;
3332                 }
3333                 wake_up_process(ctx->sqo_thread);
3334         } else if (p->flags & IORING_SETUP_SQ_AFF) {
3335                 /* Can't have SQ_AFF without SQPOLL */
3336                 ret = -EINVAL;
3337                 goto err;
3338         }
3339
3340         /* Do QD, or 2 * CPUS, whatever is smallest */
3341         ctx->sqo_wq[0] = alloc_workqueue("io_ring-wq",
3342                         WQ_UNBOUND | WQ_FREEZABLE,
3343                         min(ctx->sq_entries - 1, 2 * num_online_cpus()));
3344         if (!ctx->sqo_wq[0]) {
3345                 ret = -ENOMEM;
3346                 goto err;
3347         }
3348
3349         /*
3350          * This is for buffered writes, where we want to limit the parallelism
3351          * due to file locking in file systems. As "normal" buffered writes
3352          * should parellelize on writeout quite nicely, limit us to having 2
3353          * pending. This avoids massive contention on the inode when doing
3354          * buffered async writes.
3355          */
3356         ctx->sqo_wq[1] = alloc_workqueue("io_ring-write-wq",
3357                                                 WQ_UNBOUND | WQ_FREEZABLE, 2);
3358         if (!ctx->sqo_wq[1]) {
3359                 ret = -ENOMEM;
3360                 goto err;
3361         }
3362
3363         return 0;
3364 err:
3365         io_finish_async(ctx);
3366         mmdrop(ctx->sqo_mm);
3367         ctx->sqo_mm = NULL;
3368         return ret;
3369 }
3370
3371 static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
3372 {
3373         atomic_long_sub(nr_pages, &user->locked_vm);
3374 }
3375
3376 static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
3377 {
3378         unsigned long page_limit, cur_pages, new_pages;
3379
3380         /* Don't allow more pages than we can safely lock */
3381         page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
3382
3383         do {
3384                 cur_pages = atomic_long_read(&user->locked_vm);
3385                 new_pages = cur_pages + nr_pages;
3386                 if (new_pages > page_limit)
3387                         return -ENOMEM;
3388         } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
3389                                         new_pages) != cur_pages);
3390
3391         return 0;
3392 }
3393
3394 static void io_mem_free(void *ptr)
3395 {
3396         struct page *page;
3397
3398         if (!ptr)
3399                 return;
3400
3401         page = virt_to_head_page(ptr);
3402         if (put_page_testzero(page))
3403                 free_compound_page(page);
3404 }
3405
3406 static void *io_mem_alloc(size_t size)
3407 {
3408         gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
3409                                 __GFP_NORETRY;
3410
3411         return (void *) __get_free_pages(gfp_flags, get_order(size));
3412 }
3413
3414 static unsigned long rings_size(unsigned sq_entries, unsigned cq_entries,
3415                                 size_t *sq_offset)
3416 {
3417         struct io_rings *rings;
3418         size_t off, sq_array_size;
3419
3420         off = struct_size(rings, cqes, cq_entries);
3421         if (off == SIZE_MAX)
3422                 return SIZE_MAX;
3423
3424 #ifdef CONFIG_SMP
3425         off = ALIGN(off, SMP_CACHE_BYTES);
3426         if (off == 0)
3427                 return SIZE_MAX;
3428 #endif
3429
3430         if (sq_offset)
3431                 *sq_offset = off;
3432
3433         sq_array_size = array_size(sizeof(u32), sq_entries);
3434         if (sq_array_size == SIZE_MAX)
3435                 return SIZE_MAX;
3436
3437         if (check_add_overflow(off, sq_array_size, &off))
3438                 return SIZE_MAX;
3439
3440         return off;
3441 }
3442
3443 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
3444 {
3445         size_t pages;
3446
3447         pages = (size_t)1 << get_order(
3448                 rings_size(sq_entries, cq_entries, NULL));
3449         pages += (size_t)1 << get_order(
3450                 array_size(sizeof(struct io_uring_sqe), sq_entries));
3451
3452         return pages;
3453 }
3454
3455 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
3456 {
3457         int i, j;
3458
3459         if (!ctx->user_bufs)
3460                 return -ENXIO;
3461
3462         for (i = 0; i < ctx->nr_user_bufs; i++) {
3463                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
3464
3465                 for (j = 0; j < imu->nr_bvecs; j++)
3466                         put_user_page(imu->bvec[j].bv_page);
3467
3468                 if (ctx->account_mem)
3469                         io_unaccount_mem(ctx->user, imu->nr_bvecs);
3470                 kvfree(imu->bvec);
3471                 imu->nr_bvecs = 0;
3472         }
3473
3474         kfree(ctx->user_bufs);
3475         ctx->user_bufs = NULL;
3476         ctx->nr_user_bufs = 0;
3477         return 0;
3478 }
3479
3480 static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
3481                        void __user *arg, unsigned index)
3482 {
3483         struct iovec __user *src;
3484
3485 #ifdef CONFIG_COMPAT
3486         if (ctx->compat) {
3487                 struct compat_iovec __user *ciovs;
3488                 struct compat_iovec ciov;
3489
3490                 ciovs = (struct compat_iovec __user *) arg;
3491                 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
3492                         return -EFAULT;
3493
3494                 dst->iov_base = (void __user *) (unsigned long) ciov.iov_base;
3495                 dst->iov_len = ciov.iov_len;
3496                 return 0;
3497         }
3498 #endif
3499         src = (struct iovec __user *) arg;
3500         if (copy_from_user(dst, &src[index], sizeof(*dst)))
3501                 return -EFAULT;
3502         return 0;
3503 }
3504
3505 static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
3506                                   unsigned nr_args)
3507 {
3508         struct vm_area_struct **vmas = NULL;
3509         struct page **pages = NULL;
3510         int i, j, got_pages = 0;
3511         int ret = -EINVAL;
3512
3513         if (ctx->user_bufs)
3514                 return -EBUSY;
3515         if (!nr_args || nr_args > UIO_MAXIOV)
3516                 return -EINVAL;
3517
3518         ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
3519                                         GFP_KERNEL);
3520         if (!ctx->user_bufs)
3521                 return -ENOMEM;
3522
3523         for (i = 0; i < nr_args; i++) {
3524                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
3525                 unsigned long off, start, end, ubuf;
3526                 int pret, nr_pages;
3527                 struct iovec iov;
3528                 size_t size;
3529
3530                 ret = io_copy_iov(ctx, &iov, arg, i);
3531                 if (ret)
3532                         goto err;
3533
3534                 /*
3535                  * Don't impose further limits on the size and buffer
3536                  * constraints here, we'll -EINVAL later when IO is
3537                  * submitted if they are wrong.
3538                  */
3539                 ret = -EFAULT;
3540                 if (!iov.iov_base || !iov.iov_len)
3541                         goto err;
3542
3543                 /* arbitrary limit, but we need something */
3544                 if (iov.iov_len > SZ_1G)
3545                         goto err;
3546
3547                 ubuf = (unsigned long) iov.iov_base;
3548                 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
3549                 start = ubuf >> PAGE_SHIFT;
3550                 nr_pages = end - start;
3551
3552                 if (ctx->account_mem) {
3553                         ret = io_account_mem(ctx->user, nr_pages);
3554                         if (ret)
3555                                 goto err;
3556                 }
3557
3558                 ret = 0;
3559                 if (!pages || nr_pages > got_pages) {
3560                         kvfree(vmas);
3561                         kvfree(pages);
3562                         pages = kvmalloc_array(nr_pages, sizeof(struct page *),
3563                                                 GFP_KERNEL);
3564                         vmas = kvmalloc_array(nr_pages,
3565                                         sizeof(struct vm_area_struct *),
3566                                         GFP_KERNEL);
3567                         if (!pages || !vmas) {
3568                                 ret = -ENOMEM;
3569                                 if (ctx->account_mem)
3570                                         io_unaccount_mem(ctx->user, nr_pages);
3571                                 goto err;
3572                         }
3573                         got_pages = nr_pages;
3574                 }
3575
3576                 imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
3577                                                 GFP_KERNEL);
3578                 ret = -ENOMEM;
3579                 if (!imu->bvec) {
3580                         if (ctx->account_mem)
3581                                 io_unaccount_mem(ctx->user, nr_pages);
3582                         goto err;
3583                 }
3584
3585                 ret = 0;
3586                 down_read(&current->mm->mmap_sem);
3587                 pret = get_user_pages(ubuf, nr_pages,
3588                                       FOLL_WRITE | FOLL_LONGTERM,
3589                                       pages, vmas);
3590                 if (pret == nr_pages) {
3591                         /* don't support file backed memory */
3592                         for (j = 0; j < nr_pages; j++) {
3593                                 struct vm_area_struct *vma = vmas[j];
3594
3595                                 if (vma->vm_file &&
3596                                     !is_file_hugepages(vma->vm_file)) {
3597                                         ret = -EOPNOTSUPP;
3598                                         break;
3599                                 }
3600                         }
3601                 } else {
3602                         ret = pret < 0 ? pret : -EFAULT;
3603                 }
3604                 up_read(&current->mm->mmap_sem);
3605                 if (ret) {
3606                         /*
3607                          * if we did partial map, or found file backed vmas,
3608                          * release any pages we did get
3609                          */
3610                         if (pret > 0)
3611                                 put_user_pages(pages, pret);
3612                         if (ctx->account_mem)
3613                                 io_unaccount_mem(ctx->user, nr_pages);
3614                         kvfree(imu->bvec);
3615                         goto err;
3616                 }
3617
3618                 off = ubuf & ~PAGE_MASK;
3619                 size = iov.iov_len;
3620                 for (j = 0; j < nr_pages; j++) {
3621                         size_t vec_len;
3622
3623                         vec_len = min_t(size_t, size, PAGE_SIZE - off);
3624                         imu->bvec[j].bv_page = pages[j];
3625                         imu->bvec[j].bv_len = vec_len;
3626                         imu->bvec[j].bv_offset = off;
3627                         off = 0;
3628                         size -= vec_len;
3629                 }
3630                 /* store original address for later verification */
3631                 imu->ubuf = ubuf;
3632                 imu->len = iov.iov_len;
3633                 imu->nr_bvecs = nr_pages;
3634
3635                 ctx->nr_user_bufs++;
3636         }
3637         kvfree(pages);
3638         kvfree(vmas);
3639         return 0;
3640 err:
3641         kvfree(pages);
3642         kvfree(vmas);
3643         io_sqe_buffer_unregister(ctx);
3644         return ret;
3645 }
3646
3647 static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg)
3648 {
3649         __s32 __user *fds = arg;
3650         int fd;
3651
3652         if (ctx->cq_ev_fd)
3653                 return -EBUSY;
3654
3655         if (copy_from_user(&fd, fds, sizeof(*fds)))
3656                 return -EFAULT;
3657
3658         ctx->cq_ev_fd = eventfd_ctx_fdget(fd);
3659         if (IS_ERR(ctx->cq_ev_fd)) {
3660                 int ret = PTR_ERR(ctx->cq_ev_fd);
3661                 ctx->cq_ev_fd = NULL;
3662                 return ret;
3663         }
3664
3665         return 0;
3666 }
3667
3668 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
3669 {
3670         if (ctx->cq_ev_fd) {
3671                 eventfd_ctx_put(ctx->cq_ev_fd);
3672                 ctx->cq_ev_fd = NULL;
3673                 return 0;
3674         }
3675
3676         return -ENXIO;
3677 }
3678
3679 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
3680 {
3681         io_finish_async(ctx);
3682         if (ctx->sqo_mm)
3683                 mmdrop(ctx->sqo_mm);
3684
3685         io_iopoll_reap_events(ctx);
3686         io_sqe_buffer_unregister(ctx);
3687         io_sqe_files_unregister(ctx);
3688         io_eventfd_unregister(ctx);
3689
3690 #if defined(CONFIG_UNIX)
3691         if (ctx->ring_sock) {
3692                 ctx->ring_sock->file = NULL; /* so that iput() is called */
3693                 sock_release(ctx->ring_sock);
3694         }
3695 #endif
3696
3697         io_mem_free(ctx->rings);
3698         io_mem_free(ctx->sq_sqes);
3699
3700         percpu_ref_exit(&ctx->refs);
3701         if (ctx->account_mem)
3702                 io_unaccount_mem(ctx->user,
3703                                 ring_pages(ctx->sq_entries, ctx->cq_entries));
3704         free_uid(ctx->user);
3705         if (ctx->creds)
3706                 put_cred(ctx->creds);
3707         kfree(ctx);
3708 }
3709
3710 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
3711 {
3712         struct io_ring_ctx *ctx = file->private_data;
3713         __poll_t mask = 0;
3714
3715         poll_wait(file, &ctx->cq_wait, wait);
3716         /*
3717          * synchronizes with barrier from wq_has_sleeper call in
3718          * io_commit_cqring
3719          */
3720         smp_rmb();
3721         if (READ_ONCE(ctx->rings->sq.tail) - ctx->cached_sq_head !=
3722             ctx->rings->sq_ring_entries)
3723                 mask |= EPOLLOUT | EPOLLWRNORM;
3724         if (READ_ONCE(ctx->rings->cq.head) != ctx->cached_cq_tail)
3725                 mask |= EPOLLIN | EPOLLRDNORM;
3726
3727         return mask;
3728 }
3729
3730 static int io_uring_fasync(int fd, struct file *file, int on)
3731 {
3732         struct io_ring_ctx *ctx = file->private_data;
3733
3734         return fasync_helper(fd, file, on, &ctx->cq_fasync);
3735 }
3736
3737 static void io_cancel_async_work(struct io_ring_ctx *ctx,
3738                                  struct files_struct *files)
3739 {
3740         struct io_kiocb *req;
3741
3742         spin_lock_irq(&ctx->task_lock);
3743
3744         list_for_each_entry(req, &ctx->task_list, task_list) {
3745                 if (files && req->files != files)
3746                         continue;
3747
3748                 /*
3749                  * The below executes an smp_mb(), which matches with the
3750                  * smp_mb() (A) in io_sq_wq_submit_work() such that either
3751                  * we store REQ_F_CANCEL flag to req->flags or we see the
3752                  * req->work_task setted in io_sq_wq_submit_work().
3753                  */
3754                 smp_store_mb(req->flags, req->flags | REQ_F_CANCEL); /* B */
3755
3756                 if (req->work_task)
3757                         send_sig(SIGINT, req->work_task, 1);
3758         }
3759         spin_unlock_irq(&ctx->task_lock);
3760 }
3761
3762 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
3763 {
3764         mutex_lock(&ctx->uring_lock);
3765         percpu_ref_kill(&ctx->refs);
3766         mutex_unlock(&ctx->uring_lock);
3767
3768         io_cancel_async_work(ctx, NULL);
3769         io_kill_timeouts(ctx);
3770         io_poll_remove_all(ctx);
3771         io_iopoll_reap_events(ctx);
3772         wait_for_completion(&ctx->ctx_done);
3773         io_ring_ctx_free(ctx);
3774 }
3775
3776 static int io_uring_flush(struct file *file, void *data)
3777 {
3778         struct io_ring_ctx *ctx = file->private_data;
3779
3780         if (fatal_signal_pending(current) || (current->flags & PF_EXITING))
3781                 io_cancel_async_work(ctx, data);
3782
3783         return 0;
3784 }
3785
3786 static int io_uring_release(struct inode *inode, struct file *file)
3787 {
3788         struct io_ring_ctx *ctx = file->private_data;
3789
3790         file->private_data = NULL;
3791         io_ring_ctx_wait_and_kill(ctx);
3792         return 0;
3793 }
3794
3795 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
3796 {
3797         loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT;
3798         unsigned long sz = vma->vm_end - vma->vm_start;
3799         struct io_ring_ctx *ctx = file->private_data;
3800         unsigned long pfn;
3801         struct page *page;
3802         void *ptr;
3803
3804         switch (offset) {
3805         case IORING_OFF_SQ_RING:
3806         case IORING_OFF_CQ_RING:
3807                 ptr = ctx->rings;
3808                 break;
3809         case IORING_OFF_SQES:
3810                 ptr = ctx->sq_sqes;
3811                 break;
3812         default:
3813                 return -EINVAL;
3814         }
3815
3816         page = virt_to_head_page(ptr);
3817         if (sz > page_size(page))
3818                 return -EINVAL;
3819
3820         pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
3821         return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
3822 }
3823
3824 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
3825                 u32, min_complete, u32, flags, const sigset_t __user *, sig,
3826                 size_t, sigsz)
3827 {
3828         struct io_ring_ctx *ctx;
3829         long ret = -EBADF;
3830         int submitted = 0;
3831         struct fd f;
3832
3833         if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
3834                 return -EINVAL;
3835
3836         f = fdget(fd);
3837         if (!f.file)
3838                 return -EBADF;
3839
3840         ret = -EOPNOTSUPP;
3841         if (f.file->f_op != &io_uring_fops)
3842                 goto out_fput;
3843
3844         ret = -ENXIO;
3845         ctx = f.file->private_data;
3846         if (!percpu_ref_tryget(&ctx->refs))
3847                 goto out_fput;
3848
3849         /*
3850          * For SQ polling, the thread will do all submissions and completions.
3851          * Just return the requested submit count, and wake the thread if
3852          * we were asked to.
3853          */
3854         ret = 0;
3855         if (ctx->flags & IORING_SETUP_SQPOLL) {
3856                 if (flags & IORING_ENTER_SQ_WAKEUP)
3857                         wake_up(&ctx->sqo_wait);
3858                 submitted = to_submit;
3859         } else if (to_submit) {
3860                 to_submit = min(to_submit, ctx->sq_entries);
3861
3862                 mutex_lock(&ctx->uring_lock);
3863                 submitted = io_ring_submit(ctx, to_submit);
3864                 mutex_unlock(&ctx->uring_lock);
3865
3866                 if (submitted != to_submit)
3867                         goto out;
3868         }
3869         if (flags & IORING_ENTER_GETEVENTS) {
3870                 unsigned nr_events = 0;
3871
3872                 min_complete = min(min_complete, ctx->cq_entries);
3873
3874                 if (ctx->flags & IORING_SETUP_IOPOLL) {
3875                         ret = io_iopoll_check(ctx, &nr_events, min_complete);
3876                 } else {
3877                         ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
3878                 }
3879         }
3880
3881 out:
3882         percpu_ref_put(&ctx->refs);
3883 out_fput:
3884         fdput(f);
3885         return submitted ? submitted : ret;
3886 }
3887
3888 static const struct file_operations io_uring_fops = {
3889         .release        = io_uring_release,
3890         .flush          = io_uring_flush,
3891         .mmap           = io_uring_mmap,
3892         .poll           = io_uring_poll,
3893         .fasync         = io_uring_fasync,
3894 };
3895
3896 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
3897                                   struct io_uring_params *p)
3898 {
3899         struct io_rings *rings;
3900         size_t size, sq_array_offset;
3901
3902         /* make sure these are sane, as we already accounted them */
3903         ctx->sq_entries = p->sq_entries;
3904         ctx->cq_entries = p->cq_entries;
3905
3906         size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset);
3907         if (size == SIZE_MAX)
3908                 return -EOVERFLOW;
3909
3910         rings = io_mem_alloc(size);
3911         if (!rings)
3912                 return -ENOMEM;
3913
3914         ctx->rings = rings;
3915         ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
3916         rings->sq_ring_mask = p->sq_entries - 1;
3917         rings->cq_ring_mask = p->cq_entries - 1;
3918         rings->sq_ring_entries = p->sq_entries;
3919         rings->cq_ring_entries = p->cq_entries;
3920         ctx->sq_mask = rings->sq_ring_mask;
3921         ctx->cq_mask = rings->cq_ring_mask;
3922
3923         size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
3924         if (size == SIZE_MAX) {
3925                 io_mem_free(ctx->rings);
3926                 ctx->rings = NULL;
3927                 return -EOVERFLOW;
3928         }
3929
3930         ctx->sq_sqes = io_mem_alloc(size);
3931         if (!ctx->sq_sqes) {
3932                 io_mem_free(ctx->rings);
3933                 ctx->rings = NULL;
3934                 return -ENOMEM;
3935         }
3936
3937         return 0;
3938 }
3939
3940 /*
3941  * Allocate an anonymous fd, this is what constitutes the application
3942  * visible backing of an io_uring instance. The application mmaps this
3943  * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
3944  * we have to tie this fd to a socket for file garbage collection purposes.
3945  */
3946 static int io_uring_get_fd(struct io_ring_ctx *ctx)
3947 {
3948         struct file *file;
3949         int ret;
3950
3951 #if defined(CONFIG_UNIX)
3952         ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
3953                                 &ctx->ring_sock);
3954         if (ret)
3955                 return ret;
3956 #endif
3957
3958         ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
3959         if (ret < 0)
3960                 goto err;
3961
3962         file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
3963                                         O_RDWR | O_CLOEXEC);
3964         if (IS_ERR(file)) {
3965                 put_unused_fd(ret);
3966                 ret = PTR_ERR(file);
3967                 goto err;
3968         }
3969
3970 #if defined(CONFIG_UNIX)
3971         ctx->ring_sock->file = file;
3972         ctx->ring_sock->sk->sk_user_data = ctx;
3973 #endif
3974         fd_install(ret, file);
3975         return ret;
3976 err:
3977 #if defined(CONFIG_UNIX)
3978         sock_release(ctx->ring_sock);
3979         ctx->ring_sock = NULL;
3980 #endif
3981         return ret;
3982 }
3983
3984 static int io_uring_create(unsigned entries, struct io_uring_params *p)
3985 {
3986         struct user_struct *user = NULL;
3987         struct io_ring_ctx *ctx;
3988         bool account_mem;
3989         int ret;
3990
3991         if (!entries || entries > IORING_MAX_ENTRIES)
3992                 return -EINVAL;
3993
3994         /*
3995          * Use twice as many entries for the CQ ring. It's possible for the
3996          * application to drive a higher depth than the size of the SQ ring,
3997          * since the sqes are only used at submission time. This allows for
3998          * some flexibility in overcommitting a bit.
3999          */
4000         p->sq_entries = roundup_pow_of_two(entries);
4001         p->cq_entries = 2 * p->sq_entries;
4002
4003         user = get_uid(current_user());
4004         account_mem = !capable(CAP_IPC_LOCK);
4005
4006         if (account_mem) {
4007                 ret = io_account_mem(user,
4008                                 ring_pages(p->sq_entries, p->cq_entries));
4009                 if (ret) {
4010                         free_uid(user);
4011                         return ret;
4012                 }
4013         }
4014
4015         ctx = io_ring_ctx_alloc(p);
4016         if (!ctx) {
4017                 if (account_mem)
4018                         io_unaccount_mem(user, ring_pages(p->sq_entries,
4019                                                                 p->cq_entries));
4020                 free_uid(user);
4021                 return -ENOMEM;
4022         }
4023         ctx->compat = in_compat_syscall();
4024         ctx->account_mem = account_mem;
4025         ctx->user = user;
4026
4027         ctx->creds = get_current_cred();
4028         if (!ctx->creds) {
4029                 ret = -ENOMEM;
4030                 goto err;
4031         }
4032
4033         ret = io_allocate_scq_urings(ctx, p);
4034         if (ret)
4035                 goto err;
4036
4037         ret = io_sq_offload_start(ctx, p);
4038         if (ret)
4039                 goto err;
4040
4041         memset(&p->sq_off, 0, sizeof(p->sq_off));
4042         p->sq_off.head = offsetof(struct io_rings, sq.head);
4043         p->sq_off.tail = offsetof(struct io_rings, sq.tail);
4044         p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
4045         p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
4046         p->sq_off.flags = offsetof(struct io_rings, sq_flags);
4047         p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
4048         p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;
4049
4050         memset(&p->cq_off, 0, sizeof(p->cq_off));
4051         p->cq_off.head = offsetof(struct io_rings, cq.head);
4052         p->cq_off.tail = offsetof(struct io_rings, cq.tail);
4053         p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
4054         p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
4055         p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
4056         p->cq_off.cqes = offsetof(struct io_rings, cqes);
4057
4058         /*
4059          * Install ring fd as the very last thing, so we don't risk someone
4060          * having closed it before we finish setup
4061          */
4062         ret = io_uring_get_fd(ctx);
4063         if (ret < 0)
4064                 goto err;
4065
4066         p->features = IORING_FEAT_SINGLE_MMAP;
4067         return ret;
4068 err:
4069         io_ring_ctx_wait_and_kill(ctx);
4070         return ret;
4071 }
4072
4073 /*
4074  * Sets up an aio uring context, and returns the fd. Applications asks for a
4075  * ring size, we return the actual sq/cq ring sizes (among other things) in the
4076  * params structure passed in.
4077  */
4078 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
4079 {
4080         struct io_uring_params p;
4081         long ret;
4082         int i;
4083
4084         if (copy_from_user(&p, params, sizeof(p)))
4085                 return -EFAULT;
4086         for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
4087                 if (p.resv[i])
4088                         return -EINVAL;
4089         }
4090
4091         if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
4092                         IORING_SETUP_SQ_AFF))
4093                 return -EINVAL;
4094
4095         ret = io_uring_create(entries, &p);
4096         if (ret < 0)
4097                 return ret;
4098
4099         if (copy_to_user(params, &p, sizeof(p)))
4100                 return -EFAULT;
4101
4102         return ret;
4103 }
4104
4105 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
4106                 struct io_uring_params __user *, params)
4107 {
4108         return io_uring_setup(entries, params);
4109 }
4110
4111 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
4112                                void __user *arg, unsigned nr_args)
4113         __releases(ctx->uring_lock)
4114         __acquires(ctx->uring_lock)
4115 {
4116         int ret;
4117
4118         /*
4119          * We're inside the ring mutex, if the ref is already dying, then
4120          * someone else killed the ctx or is already going through
4121          * io_uring_register().
4122          */
4123         if (percpu_ref_is_dying(&ctx->refs))
4124                 return -ENXIO;
4125
4126         percpu_ref_kill(&ctx->refs);
4127
4128         /*
4129          * Drop uring mutex before waiting for references to exit. If another
4130          * thread is currently inside io_uring_enter() it might need to grab
4131          * the uring_lock to make progress. If we hold it here across the drain
4132          * wait, then we can deadlock. It's safe to drop the mutex here, since
4133          * no new references will come in after we've killed the percpu ref.
4134          */
4135         mutex_unlock(&ctx->uring_lock);
4136         wait_for_completion(&ctx->ctx_done);
4137         mutex_lock(&ctx->uring_lock);
4138
4139         switch (opcode) {
4140         case IORING_REGISTER_BUFFERS:
4141                 ret = io_sqe_buffer_register(ctx, arg, nr_args);
4142                 break;
4143         case IORING_UNREGISTER_BUFFERS:
4144                 ret = -EINVAL;
4145                 if (arg || nr_args)
4146                         break;
4147                 ret = io_sqe_buffer_unregister(ctx);
4148                 break;
4149         case IORING_REGISTER_FILES:
4150                 ret = io_sqe_files_register(ctx, arg, nr_args);
4151                 break;
4152         case IORING_UNREGISTER_FILES:
4153                 ret = -EINVAL;
4154                 if (arg || nr_args)
4155                         break;
4156                 ret = io_sqe_files_unregister(ctx);
4157                 break;
4158         case IORING_REGISTER_EVENTFD:
4159                 ret = -EINVAL;
4160                 if (nr_args != 1)
4161                         break;
4162                 ret = io_eventfd_register(ctx, arg);
4163                 break;
4164         case IORING_UNREGISTER_EVENTFD:
4165                 ret = -EINVAL;
4166                 if (arg || nr_args)
4167                         break;
4168                 ret = io_eventfd_unregister(ctx);
4169                 break;
4170         default:
4171                 ret = -EINVAL;
4172                 break;
4173         }
4174
4175         /* bring the ctx back to life */
4176         reinit_completion(&ctx->ctx_done);
4177         percpu_ref_reinit(&ctx->refs);
4178         return ret;
4179 }
4180
4181 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
4182                 void __user *, arg, unsigned int, nr_args)
4183 {
4184         struct io_ring_ctx *ctx;
4185         long ret = -EBADF;
4186         struct fd f;
4187
4188         f = fdget(fd);
4189         if (!f.file)
4190                 return -EBADF;
4191
4192         ret = -EOPNOTSUPP;
4193         if (f.file->f_op != &io_uring_fops)
4194                 goto out_fput;
4195
4196         ctx = f.file->private_data;
4197
4198         mutex_lock(&ctx->uring_lock);
4199         ret = __io_uring_register(ctx, opcode, arg, nr_args);
4200         mutex_unlock(&ctx->uring_lock);
4201 out_fput:
4202         fdput(f);
4203         return ret;
4204 }
4205
4206 static int __init io_uring_init(void)
4207 {
4208         req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
4209         return 0;
4210 };
4211 __initcall(io_uring_init);