GNU Linux-libre 6.1.90-gnu
[releases.git] / net / sunrpc / backchannel_rqst.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3
4 (c) 2007 Network Appliance, Inc.  All Rights Reserved.
5 (c) 2009 NetApp.  All Rights Reserved.
6
7
8 ******************************************************************************/
9
10 #include <linux/tcp.h>
11 #include <linux/slab.h>
12 #include <linux/sunrpc/xprt.h>
13 #include <linux/export.h>
14 #include <linux/sunrpc/bc_xprt.h>
15
16 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
17 #define RPCDBG_FACILITY RPCDBG_TRANS
18 #endif
19
20 #define BC_MAX_SLOTS    64U
21
22 unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt)
23 {
24         return BC_MAX_SLOTS;
25 }
26
27 /*
28  * Helper routines that track the number of preallocation elements
29  * on the transport.
30  */
31 static inline int xprt_need_to_requeue(struct rpc_xprt *xprt)
32 {
33         return xprt->bc_alloc_count < xprt->bc_alloc_max;
34 }
35
36 /*
37  * Free the preallocated rpc_rqst structure and the memory
38  * buffers hanging off of it.
39  */
40 static void xprt_free_allocation(struct rpc_rqst *req)
41 {
42         struct xdr_buf *xbufp;
43
44         dprintk("RPC:        free allocations for req= %p\n", req);
45         WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state));
46         xbufp = &req->rq_rcv_buf;
47         free_page((unsigned long)xbufp->head[0].iov_base);
48         xbufp = &req->rq_snd_buf;
49         free_page((unsigned long)xbufp->head[0].iov_base);
50         kfree(req);
51 }
52
53 static void xprt_bc_reinit_xdr_buf(struct xdr_buf *buf)
54 {
55         buf->head[0].iov_len = PAGE_SIZE;
56         buf->tail[0].iov_len = 0;
57         buf->pages = NULL;
58         buf->page_len = 0;
59         buf->flags = 0;
60         buf->len = 0;
61         buf->buflen = PAGE_SIZE;
62 }
63
64 static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags)
65 {
66         struct page *page;
67         /* Preallocate one XDR receive buffer */
68         page = alloc_page(gfp_flags);
69         if (page == NULL)
70                 return -ENOMEM;
71         xdr_buf_init(buf, page_address(page), PAGE_SIZE);
72         return 0;
73 }
74
75 static struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt)
76 {
77         gfp_t gfp_flags = GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN;
78         struct rpc_rqst *req;
79
80         /* Pre-allocate one backchannel rpc_rqst */
81         req = kzalloc(sizeof(*req), gfp_flags);
82         if (req == NULL)
83                 return NULL;
84
85         req->rq_xprt = xprt;
86         INIT_LIST_HEAD(&req->rq_bc_list);
87
88         /* Preallocate one XDR receive buffer */
89         if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) {
90                 printk(KERN_ERR "Failed to create bc receive xbuf\n");
91                 goto out_free;
92         }
93         req->rq_rcv_buf.len = PAGE_SIZE;
94
95         /* Preallocate one XDR send buffer */
96         if (xprt_alloc_xdr_buf(&req->rq_snd_buf, gfp_flags) < 0) {
97                 printk(KERN_ERR "Failed to create bc snd xbuf\n");
98                 goto out_free;
99         }
100         return req;
101 out_free:
102         xprt_free_allocation(req);
103         return NULL;
104 }
105
106 /*
107  * Preallocate up to min_reqs structures and related buffers for use
108  * by the backchannel.  This function can be called multiple times
109  * when creating new sessions that use the same rpc_xprt.  The
110  * preallocated buffers are added to the pool of resources used by
111  * the rpc_xprt.  Any one of these resources may be used by an
112  * incoming callback request.  It's up to the higher levels in the
113  * stack to enforce that the maximum number of session slots is not
114  * being exceeded.
115  *
116  * Some callback arguments can be large.  For example, a pNFS server
117  * using multiple deviceids.  The list can be unbound, but the client
118  * has the ability to tell the server the maximum size of the callback
119  * requests.  Each deviceID is 16 bytes, so allocate one page
120  * for the arguments to have enough room to receive a number of these
121  * deviceIDs.  The NFS client indicates to the pNFS server that its
122  * callback requests can be up to 4096 bytes in size.
123  */
124 int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
125 {
126         if (!xprt->ops->bc_setup)
127                 return 0;
128         return xprt->ops->bc_setup(xprt, min_reqs);
129 }
130 EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
131
132 int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
133 {
134         struct rpc_rqst *req;
135         struct list_head tmp_list;
136         int i;
137
138         dprintk("RPC:       setup backchannel transport\n");
139
140         if (min_reqs > BC_MAX_SLOTS)
141                 min_reqs = BC_MAX_SLOTS;
142
143         /*
144          * We use a temporary list to keep track of the preallocated
145          * buffers.  Once we're done building the list we splice it
146          * into the backchannel preallocation list off of the rpc_xprt
147          * struct.  This helps minimize the amount of time the list
148          * lock is held on the rpc_xprt struct.  It also makes cleanup
149          * easier in case of memory allocation errors.
150          */
151         INIT_LIST_HEAD(&tmp_list);
152         for (i = 0; i < min_reqs; i++) {
153                 /* Pre-allocate one backchannel rpc_rqst */
154                 req = xprt_alloc_bc_req(xprt);
155                 if (req == NULL) {
156                         printk(KERN_ERR "Failed to create bc rpc_rqst\n");
157                         goto out_free;
158                 }
159
160                 /* Add the allocated buffer to the tmp list */
161                 dprintk("RPC:       adding req= %p\n", req);
162                 list_add(&req->rq_bc_pa_list, &tmp_list);
163         }
164
165         /*
166          * Add the temporary list to the backchannel preallocation list
167          */
168         spin_lock(&xprt->bc_pa_lock);
169         list_splice(&tmp_list, &xprt->bc_pa_list);
170         xprt->bc_alloc_count += min_reqs;
171         xprt->bc_alloc_max += min_reqs;
172         atomic_add(min_reqs, &xprt->bc_slot_count);
173         spin_unlock(&xprt->bc_pa_lock);
174
175         dprintk("RPC:       setup backchannel transport done\n");
176         return 0;
177
178 out_free:
179         /*
180          * Memory allocation failed, free the temporary list
181          */
182         while (!list_empty(&tmp_list)) {
183                 req = list_first_entry(&tmp_list,
184                                 struct rpc_rqst,
185                                 rq_bc_pa_list);
186                 list_del(&req->rq_bc_pa_list);
187                 xprt_free_allocation(req);
188         }
189
190         dprintk("RPC:       setup backchannel transport failed\n");
191         return -ENOMEM;
192 }
193
194 /**
195  * xprt_destroy_backchannel - Destroys the backchannel preallocated structures.
196  * @xprt:       the transport holding the preallocated strucures
197  * @max_reqs:   the maximum number of preallocated structures to destroy
198  *
199  * Since these structures may have been allocated by multiple calls
200  * to xprt_setup_backchannel, we only destroy up to the maximum number
201  * of reqs specified by the caller.
202  */
203 void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
204 {
205         if (xprt->ops->bc_destroy)
206                 xprt->ops->bc_destroy(xprt, max_reqs);
207 }
208 EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
209
210 void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
211 {
212         struct rpc_rqst *req = NULL, *tmp = NULL;
213
214         dprintk("RPC:        destroy backchannel transport\n");
215
216         if (max_reqs == 0)
217                 goto out;
218
219         spin_lock_bh(&xprt->bc_pa_lock);
220         xprt->bc_alloc_max -= min(max_reqs, xprt->bc_alloc_max);
221         list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
222                 dprintk("RPC:        req=%p\n", req);
223                 list_del(&req->rq_bc_pa_list);
224                 xprt_free_allocation(req);
225                 xprt->bc_alloc_count--;
226                 atomic_dec(&xprt->bc_slot_count);
227                 if (--max_reqs == 0)
228                         break;
229         }
230         spin_unlock_bh(&xprt->bc_pa_lock);
231
232 out:
233         dprintk("RPC:        backchannel list empty= %s\n",
234                 list_empty(&xprt->bc_pa_list) ? "true" : "false");
235 }
236
237 static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid,
238                 struct rpc_rqst *new)
239 {
240         struct rpc_rqst *req = NULL;
241
242         dprintk("RPC:       allocate a backchannel request\n");
243         if (list_empty(&xprt->bc_pa_list)) {
244                 if (!new)
245                         goto not_found;
246                 if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS)
247                         goto not_found;
248                 list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list);
249                 xprt->bc_alloc_count++;
250                 atomic_inc(&xprt->bc_slot_count);
251         }
252         req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
253                                 rq_bc_pa_list);
254         req->rq_reply_bytes_recvd = 0;
255         memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
256                         sizeof(req->rq_private_buf));
257         req->rq_xid = xid;
258         req->rq_connect_cookie = xprt->connect_cookie;
259         dprintk("RPC:       backchannel req=%p\n", req);
260 not_found:
261         return req;
262 }
263
264 /*
265  * Return the preallocated rpc_rqst structure and XDR buffers
266  * associated with this rpc_task.
267  */
268 void xprt_free_bc_request(struct rpc_rqst *req)
269 {
270         struct rpc_xprt *xprt = req->rq_xprt;
271
272         xprt->ops->bc_free_rqst(req);
273 }
274
275 void xprt_free_bc_rqst(struct rpc_rqst *req)
276 {
277         struct rpc_xprt *xprt = req->rq_xprt;
278
279         dprintk("RPC:       free backchannel req=%p\n", req);
280
281         req->rq_connect_cookie = xprt->connect_cookie - 1;
282         smp_mb__before_atomic();
283         clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
284         smp_mb__after_atomic();
285
286         /*
287          * Return it to the list of preallocations so that it
288          * may be reused by a new callback request.
289          */
290         spin_lock_bh(&xprt->bc_pa_lock);
291         if (xprt_need_to_requeue(xprt)) {
292                 xprt_bc_reinit_xdr_buf(&req->rq_snd_buf);
293                 xprt_bc_reinit_xdr_buf(&req->rq_rcv_buf);
294                 req->rq_rcv_buf.len = PAGE_SIZE;
295                 list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
296                 xprt->bc_alloc_count++;
297                 atomic_inc(&xprt->bc_slot_count);
298                 req = NULL;
299         }
300         spin_unlock_bh(&xprt->bc_pa_lock);
301         if (req != NULL) {
302                 /*
303                  * The last remaining session was destroyed while this
304                  * entry was in use.  Free the entry and don't attempt
305                  * to add back to the list because there is no need to
306                  * have anymore preallocated entries.
307                  */
308                 dprintk("RPC:       Last session removed req=%p\n", req);
309                 xprt_free_allocation(req);
310         }
311         xprt_put(xprt);
312 }
313
314 /*
315  * One or more rpc_rqst structure have been preallocated during the
316  * backchannel setup.  Buffer space for the send and private XDR buffers
317  * has been preallocated as well.  Use xprt_alloc_bc_request to allocate
318  * to this request.  Use xprt_free_bc_request to return it.
319  *
320  * We know that we're called in soft interrupt context, grab the spin_lock
321  * since there is no need to grab the bottom half spin_lock.
322  *
323  * Return an available rpc_rqst, otherwise NULL if non are available.
324  */
325 struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid)
326 {
327         struct rpc_rqst *req, *new = NULL;
328
329         do {
330                 spin_lock(&xprt->bc_pa_lock);
331                 list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) {
332                         if (req->rq_connect_cookie != xprt->connect_cookie)
333                                 continue;
334                         if (req->rq_xid == xid)
335                                 goto found;
336                 }
337                 req = xprt_get_bc_request(xprt, xid, new);
338 found:
339                 spin_unlock(&xprt->bc_pa_lock);
340                 if (new) {
341                         if (req != new)
342                                 xprt_free_allocation(new);
343                         break;
344                 } else if (req)
345                         break;
346                 new = xprt_alloc_bc_req(xprt);
347         } while (new);
348         return req;
349 }
350
351 /*
352  * Add callback request to callback list.  The callback
353  * service sleeps on the sv_cb_waitq waiting for new
354  * requests.  Wake it up after adding enqueing the
355  * request.
356  */
357 void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied)
358 {
359         struct rpc_xprt *xprt = req->rq_xprt;
360         struct svc_serv *bc_serv = xprt->bc_serv;
361
362         spin_lock(&xprt->bc_pa_lock);
363         list_del(&req->rq_bc_pa_list);
364         xprt->bc_alloc_count--;
365         spin_unlock(&xprt->bc_pa_lock);
366
367         req->rq_private_buf.len = copied;
368         set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
369
370         dprintk("RPC:       add callback request to list\n");
371         xprt_get(xprt);
372         spin_lock(&bc_serv->sv_cb_lock);
373         list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
374         wake_up(&bc_serv->sv_cb_waitq);
375         spin_unlock(&bc_serv->sv_cb_lock);
376 }