GNU Linux-libre 6.1.90-gnu
[releases.git] / net / sunrpc / clnt.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  *  linux/net/sunrpc/clnt.c
4  *
5  *  This file contains the high-level RPC interface.
6  *  It is modeled as a finite state machine to support both synchronous
7  *  and asynchronous requests.
8  *
9  *  -   RPC header generation and argument serialization.
10  *  -   Credential refresh.
11  *  -   TCP connect handling.
12  *  -   Retry of operation when it is suspected the operation failed because
13  *      of uid squashing on the server, or when the credentials were stale
14  *      and need to be refreshed, or when a packet was damaged in transit.
15  *      This may be have to be moved to the VFS layer.
16  *
17  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
18  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
19  */
20
21
22 #include <linux/module.h>
23 #include <linux/types.h>
24 #include <linux/kallsyms.h>
25 #include <linux/mm.h>
26 #include <linux/namei.h>
27 #include <linux/mount.h>
28 #include <linux/slab.h>
29 #include <linux/rcupdate.h>
30 #include <linux/utsname.h>
31 #include <linux/workqueue.h>
32 #include <linux/in.h>
33 #include <linux/in6.h>
34 #include <linux/un.h>
35
36 #include <linux/sunrpc/clnt.h>
37 #include <linux/sunrpc/addr.h>
38 #include <linux/sunrpc/rpc_pipe_fs.h>
39 #include <linux/sunrpc/metrics.h>
40 #include <linux/sunrpc/bc_xprt.h>
41 #include <trace/events/sunrpc.h>
42
43 #include "sunrpc.h"
44 #include "sysfs.h"
45 #include "netns.h"
46
47 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
48 # define RPCDBG_FACILITY        RPCDBG_CALL
49 #endif
50
51 /*
52  * All RPC clients are linked into this list
53  */
54
55 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
56
57
58 static void     call_start(struct rpc_task *task);
59 static void     call_reserve(struct rpc_task *task);
60 static void     call_reserveresult(struct rpc_task *task);
61 static void     call_allocate(struct rpc_task *task);
62 static void     call_encode(struct rpc_task *task);
63 static void     call_decode(struct rpc_task *task);
64 static void     call_bind(struct rpc_task *task);
65 static void     call_bind_status(struct rpc_task *task);
66 static void     call_transmit(struct rpc_task *task);
67 static void     call_status(struct rpc_task *task);
68 static void     call_transmit_status(struct rpc_task *task);
69 static void     call_refresh(struct rpc_task *task);
70 static void     call_refreshresult(struct rpc_task *task);
71 static void     call_connect(struct rpc_task *task);
72 static void     call_connect_status(struct rpc_task *task);
73
74 static int      rpc_encode_header(struct rpc_task *task,
75                                   struct xdr_stream *xdr);
76 static int      rpc_decode_header(struct rpc_task *task,
77                                   struct xdr_stream *xdr);
78 static int      rpc_ping(struct rpc_clnt *clnt);
79 static int      rpc_ping_noreply(struct rpc_clnt *clnt);
80 static void     rpc_check_timeout(struct rpc_task *task);
81
82 static void rpc_register_client(struct rpc_clnt *clnt)
83 {
84         struct net *net = rpc_net_ns(clnt);
85         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
86
87         spin_lock(&sn->rpc_client_lock);
88         list_add(&clnt->cl_clients, &sn->all_clients);
89         spin_unlock(&sn->rpc_client_lock);
90 }
91
92 static void rpc_unregister_client(struct rpc_clnt *clnt)
93 {
94         struct net *net = rpc_net_ns(clnt);
95         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
96
97         spin_lock(&sn->rpc_client_lock);
98         list_del(&clnt->cl_clients);
99         spin_unlock(&sn->rpc_client_lock);
100 }
101
102 static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
103 {
104         rpc_remove_client_dir(clnt);
105 }
106
107 static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
108 {
109         struct net *net = rpc_net_ns(clnt);
110         struct super_block *pipefs_sb;
111
112         pipefs_sb = rpc_get_sb_net(net);
113         if (pipefs_sb) {
114                 if (pipefs_sb == clnt->pipefs_sb)
115                         __rpc_clnt_remove_pipedir(clnt);
116                 rpc_put_sb_net(net);
117         }
118 }
119
120 static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
121                                     struct rpc_clnt *clnt)
122 {
123         static uint32_t clntid;
124         const char *dir_name = clnt->cl_program->pipe_dir_name;
125         char name[15];
126         struct dentry *dir, *dentry;
127
128         dir = rpc_d_lookup_sb(sb, dir_name);
129         if (dir == NULL) {
130                 pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
131                 return dir;
132         }
133         for (;;) {
134                 snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
135                 name[sizeof(name) - 1] = '\0';
136                 dentry = rpc_create_client_dir(dir, name, clnt);
137                 if (!IS_ERR(dentry))
138                         break;
139                 if (dentry == ERR_PTR(-EEXIST))
140                         continue;
141                 printk(KERN_INFO "RPC: Couldn't create pipefs entry"
142                                 " %s/%s, error %ld\n",
143                                 dir_name, name, PTR_ERR(dentry));
144                 break;
145         }
146         dput(dir);
147         return dentry;
148 }
149
150 static int
151 rpc_setup_pipedir(struct super_block *pipefs_sb, struct rpc_clnt *clnt)
152 {
153         struct dentry *dentry;
154
155         clnt->pipefs_sb = pipefs_sb;
156
157         if (clnt->cl_program->pipe_dir_name != NULL) {
158                 dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt);
159                 if (IS_ERR(dentry))
160                         return PTR_ERR(dentry);
161         }
162         return 0;
163 }
164
165 static int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
166 {
167         if (clnt->cl_program->pipe_dir_name == NULL)
168                 return 1;
169
170         switch (event) {
171         case RPC_PIPEFS_MOUNT:
172                 if (clnt->cl_pipedir_objects.pdh_dentry != NULL)
173                         return 1;
174                 if (refcount_read(&clnt->cl_count) == 0)
175                         return 1;
176                 break;
177         case RPC_PIPEFS_UMOUNT:
178                 if (clnt->cl_pipedir_objects.pdh_dentry == NULL)
179                         return 1;
180                 break;
181         }
182         return 0;
183 }
184
185 static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
186                                    struct super_block *sb)
187 {
188         struct dentry *dentry;
189
190         switch (event) {
191         case RPC_PIPEFS_MOUNT:
192                 dentry = rpc_setup_pipedir_sb(sb, clnt);
193                 if (!dentry)
194                         return -ENOENT;
195                 if (IS_ERR(dentry))
196                         return PTR_ERR(dentry);
197                 break;
198         case RPC_PIPEFS_UMOUNT:
199                 __rpc_clnt_remove_pipedir(clnt);
200                 break;
201         default:
202                 printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
203                 return -ENOTSUPP;
204         }
205         return 0;
206 }
207
208 static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
209                                 struct super_block *sb)
210 {
211         int error = 0;
212
213         for (;; clnt = clnt->cl_parent) {
214                 if (!rpc_clnt_skip_event(clnt, event))
215                         error = __rpc_clnt_handle_event(clnt, event, sb);
216                 if (error || clnt == clnt->cl_parent)
217                         break;
218         }
219         return error;
220 }
221
222 static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
223 {
224         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
225         struct rpc_clnt *clnt;
226
227         spin_lock(&sn->rpc_client_lock);
228         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
229                 if (rpc_clnt_skip_event(clnt, event))
230                         continue;
231                 spin_unlock(&sn->rpc_client_lock);
232                 return clnt;
233         }
234         spin_unlock(&sn->rpc_client_lock);
235         return NULL;
236 }
237
238 static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
239                             void *ptr)
240 {
241         struct super_block *sb = ptr;
242         struct rpc_clnt *clnt;
243         int error = 0;
244
245         while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
246                 error = __rpc_pipefs_event(clnt, event, sb);
247                 if (error)
248                         break;
249         }
250         return error;
251 }
252
253 static struct notifier_block rpc_clients_block = {
254         .notifier_call  = rpc_pipefs_event,
255         .priority       = SUNRPC_PIPEFS_RPC_PRIO,
256 };
257
258 int rpc_clients_notifier_register(void)
259 {
260         return rpc_pipefs_notifier_register(&rpc_clients_block);
261 }
262
263 void rpc_clients_notifier_unregister(void)
264 {
265         return rpc_pipefs_notifier_unregister(&rpc_clients_block);
266 }
267
268 static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt,
269                 struct rpc_xprt *xprt,
270                 const struct rpc_timeout *timeout)
271 {
272         struct rpc_xprt *old;
273
274         spin_lock(&clnt->cl_lock);
275         old = rcu_dereference_protected(clnt->cl_xprt,
276                         lockdep_is_held(&clnt->cl_lock));
277
278         if (!xprt_bound(xprt))
279                 clnt->cl_autobind = 1;
280
281         clnt->cl_timeout = timeout;
282         rcu_assign_pointer(clnt->cl_xprt, xprt);
283         spin_unlock(&clnt->cl_lock);
284
285         return old;
286 }
287
288 static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
289 {
290         clnt->cl_nodelen = strlcpy(clnt->cl_nodename,
291                         nodename, sizeof(clnt->cl_nodename));
292 }
293
294 static int rpc_client_register(struct rpc_clnt *clnt,
295                                rpc_authflavor_t pseudoflavor,
296                                const char *client_name)
297 {
298         struct rpc_auth_create_args auth_args = {
299                 .pseudoflavor = pseudoflavor,
300                 .target_name = client_name,
301         };
302         struct rpc_auth *auth;
303         struct net *net = rpc_net_ns(clnt);
304         struct super_block *pipefs_sb;
305         int err;
306
307         rpc_clnt_debugfs_register(clnt);
308
309         pipefs_sb = rpc_get_sb_net(net);
310         if (pipefs_sb) {
311                 err = rpc_setup_pipedir(pipefs_sb, clnt);
312                 if (err)
313                         goto out;
314         }
315
316         rpc_register_client(clnt);
317         if (pipefs_sb)
318                 rpc_put_sb_net(net);
319
320         auth = rpcauth_create(&auth_args, clnt);
321         if (IS_ERR(auth)) {
322                 dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
323                                 pseudoflavor);
324                 err = PTR_ERR(auth);
325                 goto err_auth;
326         }
327         return 0;
328 err_auth:
329         pipefs_sb = rpc_get_sb_net(net);
330         rpc_unregister_client(clnt);
331         __rpc_clnt_remove_pipedir(clnt);
332 out:
333         if (pipefs_sb)
334                 rpc_put_sb_net(net);
335         rpc_sysfs_client_destroy(clnt);
336         rpc_clnt_debugfs_unregister(clnt);
337         return err;
338 }
339
340 static DEFINE_IDA(rpc_clids);
341
342 void rpc_cleanup_clids(void)
343 {
344         ida_destroy(&rpc_clids);
345 }
346
347 static int rpc_alloc_clid(struct rpc_clnt *clnt)
348 {
349         int clid;
350
351         clid = ida_alloc(&rpc_clids, GFP_KERNEL);
352         if (clid < 0)
353                 return clid;
354         clnt->cl_clid = clid;
355         return 0;
356 }
357
358 static void rpc_free_clid(struct rpc_clnt *clnt)
359 {
360         ida_free(&rpc_clids, clnt->cl_clid);
361 }
362
363 static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
364                 struct rpc_xprt_switch *xps,
365                 struct rpc_xprt *xprt,
366                 struct rpc_clnt *parent)
367 {
368         const struct rpc_program *program = args->program;
369         const struct rpc_version *version;
370         struct rpc_clnt *clnt = NULL;
371         const struct rpc_timeout *timeout;
372         const char *nodename = args->nodename;
373         int err;
374
375         err = rpciod_up();
376         if (err)
377                 goto out_no_rpciod;
378
379         err = -EINVAL;
380         if (args->version >= program->nrvers)
381                 goto out_err;
382         version = program->version[args->version];
383         if (version == NULL)
384                 goto out_err;
385
386         err = -ENOMEM;
387         clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
388         if (!clnt)
389                 goto out_err;
390         clnt->cl_parent = parent ? : clnt;
391
392         err = rpc_alloc_clid(clnt);
393         if (err)
394                 goto out_no_clid;
395
396         clnt->cl_cred     = get_cred(args->cred);
397         clnt->cl_procinfo = version->procs;
398         clnt->cl_maxproc  = version->nrprocs;
399         clnt->cl_prog     = args->prognumber ? : program->number;
400         clnt->cl_vers     = version->number;
401         clnt->cl_stats    = program->stats;
402         clnt->cl_metrics  = rpc_alloc_iostats(clnt);
403         rpc_init_pipe_dir_head(&clnt->cl_pipedir_objects);
404         err = -ENOMEM;
405         if (clnt->cl_metrics == NULL)
406                 goto out_no_stats;
407         clnt->cl_program  = program;
408         INIT_LIST_HEAD(&clnt->cl_tasks);
409         spin_lock_init(&clnt->cl_lock);
410
411         timeout = xprt->timeout;
412         if (args->timeout != NULL) {
413                 memcpy(&clnt->cl_timeout_default, args->timeout,
414                                 sizeof(clnt->cl_timeout_default));
415                 timeout = &clnt->cl_timeout_default;
416         }
417
418         rpc_clnt_set_transport(clnt, xprt, timeout);
419         xprt->main = true;
420         xprt_iter_init(&clnt->cl_xpi, xps);
421         xprt_switch_put(xps);
422
423         clnt->cl_rtt = &clnt->cl_rtt_default;
424         rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
425
426         refcount_set(&clnt->cl_count, 1);
427
428         if (nodename == NULL)
429                 nodename = utsname()->nodename;
430         /* save the nodename */
431         rpc_clnt_set_nodename(clnt, nodename);
432
433         rpc_sysfs_client_setup(clnt, xps, rpc_net_ns(clnt));
434         err = rpc_client_register(clnt, args->authflavor, args->client_name);
435         if (err)
436                 goto out_no_path;
437         if (parent)
438                 refcount_inc(&parent->cl_count);
439
440         trace_rpc_clnt_new(clnt, xprt, program->name, args->servername);
441         return clnt;
442
443 out_no_path:
444         rpc_free_iostats(clnt->cl_metrics);
445 out_no_stats:
446         put_cred(clnt->cl_cred);
447         rpc_free_clid(clnt);
448 out_no_clid:
449         kfree(clnt);
450 out_err:
451         rpciod_down();
452 out_no_rpciod:
453         xprt_switch_put(xps);
454         xprt_put(xprt);
455         trace_rpc_clnt_new_err(program->name, args->servername, err);
456         return ERR_PTR(err);
457 }
458
459 static struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
460                                         struct rpc_xprt *xprt)
461 {
462         struct rpc_clnt *clnt = NULL;
463         struct rpc_xprt_switch *xps;
464
465         if (args->bc_xprt && args->bc_xprt->xpt_bc_xps) {
466                 WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
467                 xps = args->bc_xprt->xpt_bc_xps;
468                 xprt_switch_get(xps);
469         } else {
470                 xps = xprt_switch_alloc(xprt, GFP_KERNEL);
471                 if (xps == NULL) {
472                         xprt_put(xprt);
473                         return ERR_PTR(-ENOMEM);
474                 }
475                 if (xprt->bc_xprt) {
476                         xprt_switch_get(xps);
477                         xprt->bc_xprt->xpt_bc_xps = xps;
478                 }
479         }
480         clnt = rpc_new_client(args, xps, xprt, NULL);
481         if (IS_ERR(clnt))
482                 return clnt;
483
484         if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
485                 int err = rpc_ping(clnt);
486                 if (err != 0) {
487                         rpc_shutdown_client(clnt);
488                         return ERR_PTR(err);
489                 }
490         } else if (args->flags & RPC_CLNT_CREATE_CONNECTED) {
491                 int err = rpc_ping_noreply(clnt);
492                 if (err != 0) {
493                         rpc_shutdown_client(clnt);
494                         return ERR_PTR(err);
495                 }
496         }
497
498         clnt->cl_softrtry = 1;
499         if (args->flags & (RPC_CLNT_CREATE_HARDRTRY|RPC_CLNT_CREATE_SOFTERR)) {
500                 clnt->cl_softrtry = 0;
501                 if (args->flags & RPC_CLNT_CREATE_SOFTERR)
502                         clnt->cl_softerr = 1;
503         }
504
505         if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
506                 clnt->cl_autobind = 1;
507         if (args->flags & RPC_CLNT_CREATE_NO_RETRANS_TIMEOUT)
508                 clnt->cl_noretranstimeo = 1;
509         if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
510                 clnt->cl_discrtry = 1;
511         if (!(args->flags & RPC_CLNT_CREATE_QUIET))
512                 clnt->cl_chatty = 1;
513
514         return clnt;
515 }
516
517 /**
518  * rpc_create - create an RPC client and transport with one call
519  * @args: rpc_clnt create argument structure
520  *
521  * Creates and initializes an RPC transport and an RPC client.
522  *
523  * It can ping the server in order to determine if it is up, and to see if
524  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
525  * this behavior so asynchronous tasks can also use rpc_create.
526  */
527 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
528 {
529         struct rpc_xprt *xprt;
530         struct xprt_create xprtargs = {
531                 .net = args->net,
532                 .ident = args->protocol,
533                 .srcaddr = args->saddress,
534                 .dstaddr = args->address,
535                 .addrlen = args->addrsize,
536                 .servername = args->servername,
537                 .bc_xprt = args->bc_xprt,
538         };
539         char servername[48];
540         struct rpc_clnt *clnt;
541         int i;
542
543         if (args->bc_xprt) {
544                 WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
545                 xprt = args->bc_xprt->xpt_bc_xprt;
546                 if (xprt) {
547                         xprt_get(xprt);
548                         return rpc_create_xprt(args, xprt);
549                 }
550         }
551
552         if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
553                 xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
554         if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
555                 xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
556         /*
557          * If the caller chooses not to specify a hostname, whip
558          * up a string representation of the passed-in address.
559          */
560         if (xprtargs.servername == NULL) {
561                 struct sockaddr_un *sun =
562                                 (struct sockaddr_un *)args->address;
563                 struct sockaddr_in *sin =
564                                 (struct sockaddr_in *)args->address;
565                 struct sockaddr_in6 *sin6 =
566                                 (struct sockaddr_in6 *)args->address;
567
568                 servername[0] = '\0';
569                 switch (args->address->sa_family) {
570                 case AF_LOCAL:
571                         snprintf(servername, sizeof(servername), "%s",
572                                  sun->sun_path);
573                         break;
574                 case AF_INET:
575                         snprintf(servername, sizeof(servername), "%pI4",
576                                  &sin->sin_addr.s_addr);
577                         break;
578                 case AF_INET6:
579                         snprintf(servername, sizeof(servername), "%pI6",
580                                  &sin6->sin6_addr);
581                         break;
582                 default:
583                         /* caller wants default server name, but
584                          * address family isn't recognized. */
585                         return ERR_PTR(-EINVAL);
586                 }
587                 xprtargs.servername = servername;
588         }
589
590         xprt = xprt_create_transport(&xprtargs);
591         if (IS_ERR(xprt))
592                 return (struct rpc_clnt *)xprt;
593
594         /*
595          * By default, kernel RPC client connects from a reserved port.
596          * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
597          * but it is always enabled for rpciod, which handles the connect
598          * operation.
599          */
600         xprt->resvport = 1;
601         if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
602                 xprt->resvport = 0;
603         xprt->reuseport = 0;
604         if (args->flags & RPC_CLNT_CREATE_REUSEPORT)
605                 xprt->reuseport = 1;
606
607         clnt = rpc_create_xprt(args, xprt);
608         if (IS_ERR(clnt) || args->nconnect <= 1)
609                 return clnt;
610
611         for (i = 0; i < args->nconnect - 1; i++) {
612                 if (rpc_clnt_add_xprt(clnt, &xprtargs, NULL, NULL) < 0)
613                         break;
614         }
615         return clnt;
616 }
617 EXPORT_SYMBOL_GPL(rpc_create);
618
619 /*
620  * This function clones the RPC client structure. It allows us to share the
621  * same transport while varying parameters such as the authentication
622  * flavour.
623  */
624 static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
625                                            struct rpc_clnt *clnt)
626 {
627         struct rpc_xprt_switch *xps;
628         struct rpc_xprt *xprt;
629         struct rpc_clnt *new;
630         int err;
631
632         err = -ENOMEM;
633         rcu_read_lock();
634         xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
635         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
636         rcu_read_unlock();
637         if (xprt == NULL || xps == NULL) {
638                 xprt_put(xprt);
639                 xprt_switch_put(xps);
640                 goto out_err;
641         }
642         args->servername = xprt->servername;
643         args->nodename = clnt->cl_nodename;
644
645         new = rpc_new_client(args, xps, xprt, clnt);
646         if (IS_ERR(new))
647                 return new;
648
649         /* Turn off autobind on clones */
650         new->cl_autobind = 0;
651         new->cl_softrtry = clnt->cl_softrtry;
652         new->cl_softerr = clnt->cl_softerr;
653         new->cl_noretranstimeo = clnt->cl_noretranstimeo;
654         new->cl_discrtry = clnt->cl_discrtry;
655         new->cl_chatty = clnt->cl_chatty;
656         new->cl_principal = clnt->cl_principal;
657         new->cl_max_connect = clnt->cl_max_connect;
658         return new;
659
660 out_err:
661         trace_rpc_clnt_clone_err(clnt, err);
662         return ERR_PTR(err);
663 }
664
665 /**
666  * rpc_clone_client - Clone an RPC client structure
667  *
668  * @clnt: RPC client whose parameters are copied
669  *
670  * Returns a fresh RPC client or an ERR_PTR.
671  */
672 struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
673 {
674         struct rpc_create_args args = {
675                 .program        = clnt->cl_program,
676                 .prognumber     = clnt->cl_prog,
677                 .version        = clnt->cl_vers,
678                 .authflavor     = clnt->cl_auth->au_flavor,
679                 .cred           = clnt->cl_cred,
680         };
681         return __rpc_clone_client(&args, clnt);
682 }
683 EXPORT_SYMBOL_GPL(rpc_clone_client);
684
685 /**
686  * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
687  *
688  * @clnt: RPC client whose parameters are copied
689  * @flavor: security flavor for new client
690  *
691  * Returns a fresh RPC client or an ERR_PTR.
692  */
693 struct rpc_clnt *
694 rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
695 {
696         struct rpc_create_args args = {
697                 .program        = clnt->cl_program,
698                 .prognumber     = clnt->cl_prog,
699                 .version        = clnt->cl_vers,
700                 .authflavor     = flavor,
701                 .cred           = clnt->cl_cred,
702         };
703         return __rpc_clone_client(&args, clnt);
704 }
705 EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
706
707 /**
708  * rpc_switch_client_transport: switch the RPC transport on the fly
709  * @clnt: pointer to a struct rpc_clnt
710  * @args: pointer to the new transport arguments
711  * @timeout: pointer to the new timeout parameters
712  *
713  * This function allows the caller to switch the RPC transport for the
714  * rpc_clnt structure 'clnt' to allow it to connect to a mirrored NFS
715  * server, for instance.  It assumes that the caller has ensured that
716  * there are no active RPC tasks by using some form of locking.
717  *
718  * Returns zero if "clnt" is now using the new xprt.  Otherwise a
719  * negative errno is returned, and "clnt" continues to use the old
720  * xprt.
721  */
722 int rpc_switch_client_transport(struct rpc_clnt *clnt,
723                 struct xprt_create *args,
724                 const struct rpc_timeout *timeout)
725 {
726         const struct rpc_timeout *old_timeo;
727         rpc_authflavor_t pseudoflavor;
728         struct rpc_xprt_switch *xps, *oldxps;
729         struct rpc_xprt *xprt, *old;
730         struct rpc_clnt *parent;
731         int err;
732
733         xprt = xprt_create_transport(args);
734         if (IS_ERR(xprt))
735                 return PTR_ERR(xprt);
736
737         xps = xprt_switch_alloc(xprt, GFP_KERNEL);
738         if (xps == NULL) {
739                 xprt_put(xprt);
740                 return -ENOMEM;
741         }
742
743         pseudoflavor = clnt->cl_auth->au_flavor;
744
745         old_timeo = clnt->cl_timeout;
746         old = rpc_clnt_set_transport(clnt, xprt, timeout);
747         oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps);
748
749         rpc_unregister_client(clnt);
750         __rpc_clnt_remove_pipedir(clnt);
751         rpc_sysfs_client_destroy(clnt);
752         rpc_clnt_debugfs_unregister(clnt);
753
754         /*
755          * A new transport was created.  "clnt" therefore
756          * becomes the root of a new cl_parent tree.  clnt's
757          * children, if it has any, still point to the old xprt.
758          */
759         parent = clnt->cl_parent;
760         clnt->cl_parent = clnt;
761
762         /*
763          * The old rpc_auth cache cannot be re-used.  GSS
764          * contexts in particular are between a single
765          * client and server.
766          */
767         err = rpc_client_register(clnt, pseudoflavor, NULL);
768         if (err)
769                 goto out_revert;
770
771         synchronize_rcu();
772         if (parent != clnt)
773                 rpc_release_client(parent);
774         xprt_switch_put(oldxps);
775         xprt_put(old);
776         trace_rpc_clnt_replace_xprt(clnt);
777         return 0;
778
779 out_revert:
780         xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps);
781         rpc_clnt_set_transport(clnt, old, old_timeo);
782         clnt->cl_parent = parent;
783         rpc_client_register(clnt, pseudoflavor, NULL);
784         xprt_switch_put(xps);
785         xprt_put(xprt);
786         trace_rpc_clnt_replace_xprt_err(clnt);
787         return err;
788 }
789 EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
790
791 static
792 int _rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi,
793                              void func(struct rpc_xprt_iter *xpi, struct rpc_xprt_switch *xps))
794 {
795         struct rpc_xprt_switch *xps;
796
797         rcu_read_lock();
798         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
799         rcu_read_unlock();
800         if (xps == NULL)
801                 return -EAGAIN;
802         func(xpi, xps);
803         xprt_switch_put(xps);
804         return 0;
805 }
806
807 static
808 int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi)
809 {
810         return _rpc_clnt_xprt_iter_init(clnt, xpi, xprt_iter_init_listall);
811 }
812
813 static
814 int rpc_clnt_xprt_iter_offline_init(struct rpc_clnt *clnt,
815                                     struct rpc_xprt_iter *xpi)
816 {
817         return _rpc_clnt_xprt_iter_init(clnt, xpi, xprt_iter_init_listoffline);
818 }
819
820 /**
821  * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports
822  * @clnt: pointer to client
823  * @fn: function to apply
824  * @data: void pointer to function data
825  *
826  * Iterates through the list of RPC transports currently attached to the
827  * client and applies the function fn(clnt, xprt, data).
828  *
829  * On error, the iteration stops, and the function returns the error value.
830  */
831 int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
832                 int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
833                 void *data)
834 {
835         struct rpc_xprt_iter xpi;
836         int ret;
837
838         ret = rpc_clnt_xprt_iter_init(clnt, &xpi);
839         if (ret)
840                 return ret;
841         for (;;) {
842                 struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
843
844                 if (!xprt)
845                         break;
846                 ret = fn(clnt, xprt, data);
847                 xprt_put(xprt);
848                 if (ret < 0)
849                         break;
850         }
851         xprt_iter_destroy(&xpi);
852         return ret;
853 }
854 EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt);
855
856 /*
857  * Kill all tasks for the given client.
858  * XXX: kill their descendants as well?
859  */
860 void rpc_killall_tasks(struct rpc_clnt *clnt)
861 {
862         struct rpc_task *rovr;
863
864
865         if (list_empty(&clnt->cl_tasks))
866                 return;
867
868         /*
869          * Spin lock all_tasks to prevent changes...
870          */
871         trace_rpc_clnt_killall(clnt);
872         spin_lock(&clnt->cl_lock);
873         list_for_each_entry(rovr, &clnt->cl_tasks, tk_task)
874                 rpc_signal_task(rovr);
875         spin_unlock(&clnt->cl_lock);
876 }
877 EXPORT_SYMBOL_GPL(rpc_killall_tasks);
878
879 /**
880  * rpc_cancel_tasks - try to cancel a set of RPC tasks
881  * @clnt: Pointer to RPC client
882  * @error: RPC task error value to set
883  * @fnmatch: Pointer to selector function
884  * @data: User data
885  *
886  * Uses @fnmatch to define a set of RPC tasks that are to be cancelled.
887  * The argument @error must be a negative error value.
888  */
889 unsigned long rpc_cancel_tasks(struct rpc_clnt *clnt, int error,
890                                bool (*fnmatch)(const struct rpc_task *,
891                                                const void *),
892                                const void *data)
893 {
894         struct rpc_task *task;
895         unsigned long count = 0;
896
897         if (list_empty(&clnt->cl_tasks))
898                 return 0;
899         /*
900          * Spin lock all_tasks to prevent changes...
901          */
902         spin_lock(&clnt->cl_lock);
903         list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
904                 if (!RPC_IS_ACTIVATED(task))
905                         continue;
906                 if (!fnmatch(task, data))
907                         continue;
908                 rpc_task_try_cancel(task, error);
909                 count++;
910         }
911         spin_unlock(&clnt->cl_lock);
912         return count;
913 }
914 EXPORT_SYMBOL_GPL(rpc_cancel_tasks);
915
916 static int rpc_clnt_disconnect_xprt(struct rpc_clnt *clnt,
917                                     struct rpc_xprt *xprt, void *dummy)
918 {
919         if (xprt_connected(xprt))
920                 xprt_force_disconnect(xprt);
921         return 0;
922 }
923
924 void rpc_clnt_disconnect(struct rpc_clnt *clnt)
925 {
926         rpc_clnt_iterate_for_each_xprt(clnt, rpc_clnt_disconnect_xprt, NULL);
927 }
928 EXPORT_SYMBOL_GPL(rpc_clnt_disconnect);
929
930 /*
931  * Properly shut down an RPC client, terminating all outstanding
932  * requests.
933  */
934 void rpc_shutdown_client(struct rpc_clnt *clnt)
935 {
936         might_sleep();
937
938         trace_rpc_clnt_shutdown(clnt);
939
940         while (!list_empty(&clnt->cl_tasks)) {
941                 rpc_killall_tasks(clnt);
942                 wait_event_timeout(destroy_wait,
943                         list_empty(&clnt->cl_tasks), 1*HZ);
944         }
945
946         rpc_release_client(clnt);
947 }
948 EXPORT_SYMBOL_GPL(rpc_shutdown_client);
949
950 /*
951  * Free an RPC client
952  */
953 static void rpc_free_client_work(struct work_struct *work)
954 {
955         struct rpc_clnt *clnt = container_of(work, struct rpc_clnt, cl_work);
956
957         trace_rpc_clnt_free(clnt);
958
959         /* These might block on processes that might allocate memory,
960          * so they cannot be called in rpciod, so they are handled separately
961          * here.
962          */
963         rpc_sysfs_client_destroy(clnt);
964         rpc_clnt_debugfs_unregister(clnt);
965         rpc_free_clid(clnt);
966         rpc_clnt_remove_pipedir(clnt);
967         xprt_put(rcu_dereference_raw(clnt->cl_xprt));
968
969         kfree(clnt);
970         rpciod_down();
971 }
972 static struct rpc_clnt *
973 rpc_free_client(struct rpc_clnt *clnt)
974 {
975         struct rpc_clnt *parent = NULL;
976
977         trace_rpc_clnt_release(clnt);
978         if (clnt->cl_parent != clnt)
979                 parent = clnt->cl_parent;
980         rpc_unregister_client(clnt);
981         rpc_free_iostats(clnt->cl_metrics);
982         clnt->cl_metrics = NULL;
983         xprt_iter_destroy(&clnt->cl_xpi);
984         put_cred(clnt->cl_cred);
985
986         INIT_WORK(&clnt->cl_work, rpc_free_client_work);
987         schedule_work(&clnt->cl_work);
988         return parent;
989 }
990
991 /*
992  * Free an RPC client
993  */
994 static struct rpc_clnt *
995 rpc_free_auth(struct rpc_clnt *clnt)
996 {
997         /*
998          * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
999          *       release remaining GSS contexts. This mechanism ensures
1000          *       that it can do so safely.
1001          */
1002         if (clnt->cl_auth != NULL) {
1003                 rpcauth_release(clnt->cl_auth);
1004                 clnt->cl_auth = NULL;
1005         }
1006         if (refcount_dec_and_test(&clnt->cl_count))
1007                 return rpc_free_client(clnt);
1008         return NULL;
1009 }
1010
1011 /*
1012  * Release reference to the RPC client
1013  */
1014 void
1015 rpc_release_client(struct rpc_clnt *clnt)
1016 {
1017         do {
1018                 if (list_empty(&clnt->cl_tasks))
1019                         wake_up(&destroy_wait);
1020                 if (refcount_dec_not_one(&clnt->cl_count))
1021                         break;
1022                 clnt = rpc_free_auth(clnt);
1023         } while (clnt != NULL);
1024 }
1025 EXPORT_SYMBOL_GPL(rpc_release_client);
1026
1027 /**
1028  * rpc_bind_new_program - bind a new RPC program to an existing client
1029  * @old: old rpc_client
1030  * @program: rpc program to set
1031  * @vers: rpc program version
1032  *
1033  * Clones the rpc client and sets up a new RPC program. This is mainly
1034  * of use for enabling different RPC programs to share the same transport.
1035  * The Sun NFSv2/v3 ACL protocol can do this.
1036  */
1037 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
1038                                       const struct rpc_program *program,
1039                                       u32 vers)
1040 {
1041         struct rpc_create_args args = {
1042                 .program        = program,
1043                 .prognumber     = program->number,
1044                 .version        = vers,
1045                 .authflavor     = old->cl_auth->au_flavor,
1046                 .cred           = old->cl_cred,
1047         };
1048         struct rpc_clnt *clnt;
1049         int err;
1050
1051         clnt = __rpc_clone_client(&args, old);
1052         if (IS_ERR(clnt))
1053                 goto out;
1054         err = rpc_ping(clnt);
1055         if (err != 0) {
1056                 rpc_shutdown_client(clnt);
1057                 clnt = ERR_PTR(err);
1058         }
1059 out:
1060         return clnt;
1061 }
1062 EXPORT_SYMBOL_GPL(rpc_bind_new_program);
1063
1064 struct rpc_xprt *
1065 rpc_task_get_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
1066 {
1067         struct rpc_xprt_switch *xps;
1068
1069         if (!xprt)
1070                 return NULL;
1071         rcu_read_lock();
1072         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
1073         atomic_long_inc(&xps->xps_queuelen);
1074         rcu_read_unlock();
1075         atomic_long_inc(&xprt->queuelen);
1076
1077         return xprt;
1078 }
1079
1080 static void
1081 rpc_task_release_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
1082 {
1083         struct rpc_xprt_switch *xps;
1084
1085         atomic_long_dec(&xprt->queuelen);
1086         rcu_read_lock();
1087         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
1088         atomic_long_dec(&xps->xps_queuelen);
1089         rcu_read_unlock();
1090
1091         xprt_put(xprt);
1092 }
1093
1094 void rpc_task_release_transport(struct rpc_task *task)
1095 {
1096         struct rpc_xprt *xprt = task->tk_xprt;
1097
1098         if (xprt) {
1099                 task->tk_xprt = NULL;
1100                 if (task->tk_client)
1101                         rpc_task_release_xprt(task->tk_client, xprt);
1102                 else
1103                         xprt_put(xprt);
1104         }
1105 }
1106 EXPORT_SYMBOL_GPL(rpc_task_release_transport);
1107
1108 void rpc_task_release_client(struct rpc_task *task)
1109 {
1110         struct rpc_clnt *clnt = task->tk_client;
1111
1112         rpc_task_release_transport(task);
1113         if (clnt != NULL) {
1114                 /* Remove from client task list */
1115                 spin_lock(&clnt->cl_lock);
1116                 list_del(&task->tk_task);
1117                 spin_unlock(&clnt->cl_lock);
1118                 task->tk_client = NULL;
1119
1120                 rpc_release_client(clnt);
1121         }
1122 }
1123
1124 static struct rpc_xprt *
1125 rpc_task_get_first_xprt(struct rpc_clnt *clnt)
1126 {
1127         struct rpc_xprt *xprt;
1128
1129         rcu_read_lock();
1130         xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
1131         rcu_read_unlock();
1132         return rpc_task_get_xprt(clnt, xprt);
1133 }
1134
1135 static struct rpc_xprt *
1136 rpc_task_get_next_xprt(struct rpc_clnt *clnt)
1137 {
1138         return rpc_task_get_xprt(clnt, xprt_iter_get_next(&clnt->cl_xpi));
1139 }
1140
1141 static
1142 void rpc_task_set_transport(struct rpc_task *task, struct rpc_clnt *clnt)
1143 {
1144         if (task->tk_xprt) {
1145                 if (!(test_bit(XPRT_OFFLINE, &task->tk_xprt->state) &&
1146                       (task->tk_flags & RPC_TASK_MOVEABLE)))
1147                         return;
1148                 xprt_release(task);
1149                 xprt_put(task->tk_xprt);
1150         }
1151         if (task->tk_flags & RPC_TASK_NO_ROUND_ROBIN)
1152                 task->tk_xprt = rpc_task_get_first_xprt(clnt);
1153         else
1154                 task->tk_xprt = rpc_task_get_next_xprt(clnt);
1155 }
1156
1157 static
1158 void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
1159 {
1160         rpc_task_set_transport(task, clnt);
1161         task->tk_client = clnt;
1162         refcount_inc(&clnt->cl_count);
1163         if (clnt->cl_softrtry)
1164                 task->tk_flags |= RPC_TASK_SOFT;
1165         if (clnt->cl_softerr)
1166                 task->tk_flags |= RPC_TASK_TIMEOUT;
1167         if (clnt->cl_noretranstimeo)
1168                 task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
1169         /* Add to the client's list of all tasks */
1170         spin_lock(&clnt->cl_lock);
1171         list_add_tail(&task->tk_task, &clnt->cl_tasks);
1172         spin_unlock(&clnt->cl_lock);
1173 }
1174
1175 static void
1176 rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
1177 {
1178         if (msg != NULL) {
1179                 task->tk_msg.rpc_proc = msg->rpc_proc;
1180                 task->tk_msg.rpc_argp = msg->rpc_argp;
1181                 task->tk_msg.rpc_resp = msg->rpc_resp;
1182                 task->tk_msg.rpc_cred = msg->rpc_cred;
1183                 if (!(task->tk_flags & RPC_TASK_CRED_NOREF))
1184                         get_cred(task->tk_msg.rpc_cred);
1185         }
1186 }
1187
1188 /*
1189  * Default callback for async RPC calls
1190  */
1191 static void
1192 rpc_default_callback(struct rpc_task *task, void *data)
1193 {
1194 }
1195
1196 static const struct rpc_call_ops rpc_default_ops = {
1197         .rpc_call_done = rpc_default_callback,
1198 };
1199
1200 /**
1201  * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
1202  * @task_setup_data: pointer to task initialisation data
1203  */
1204 struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
1205 {
1206         struct rpc_task *task;
1207
1208         task = rpc_new_task(task_setup_data);
1209         if (IS_ERR(task))
1210                 return task;
1211
1212         if (!RPC_IS_ASYNC(task))
1213                 task->tk_flags |= RPC_TASK_CRED_NOREF;
1214
1215         rpc_task_set_client(task, task_setup_data->rpc_client);
1216         rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
1217
1218         if (task->tk_action == NULL)
1219                 rpc_call_start(task);
1220
1221         atomic_inc(&task->tk_count);
1222         rpc_execute(task);
1223         return task;
1224 }
1225 EXPORT_SYMBOL_GPL(rpc_run_task);
1226
1227 /**
1228  * rpc_call_sync - Perform a synchronous RPC call
1229  * @clnt: pointer to RPC client
1230  * @msg: RPC call parameters
1231  * @flags: RPC call flags
1232  */
1233 int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
1234 {
1235         struct rpc_task *task;
1236         struct rpc_task_setup task_setup_data = {
1237                 .rpc_client = clnt,
1238                 .rpc_message = msg,
1239                 .callback_ops = &rpc_default_ops,
1240                 .flags = flags,
1241         };
1242         int status;
1243
1244         WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
1245         if (flags & RPC_TASK_ASYNC) {
1246                 rpc_release_calldata(task_setup_data.callback_ops,
1247                         task_setup_data.callback_data);
1248                 return -EINVAL;
1249         }
1250
1251         task = rpc_run_task(&task_setup_data);
1252         if (IS_ERR(task))
1253                 return PTR_ERR(task);
1254         status = task->tk_status;
1255         rpc_put_task(task);
1256         return status;
1257 }
1258 EXPORT_SYMBOL_GPL(rpc_call_sync);
1259
1260 /**
1261  * rpc_call_async - Perform an asynchronous RPC call
1262  * @clnt: pointer to RPC client
1263  * @msg: RPC call parameters
1264  * @flags: RPC call flags
1265  * @tk_ops: RPC call ops
1266  * @data: user call data
1267  */
1268 int
1269 rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
1270                const struct rpc_call_ops *tk_ops, void *data)
1271 {
1272         struct rpc_task *task;
1273         struct rpc_task_setup task_setup_data = {
1274                 .rpc_client = clnt,
1275                 .rpc_message = msg,
1276                 .callback_ops = tk_ops,
1277                 .callback_data = data,
1278                 .flags = flags|RPC_TASK_ASYNC,
1279         };
1280
1281         task = rpc_run_task(&task_setup_data);
1282         if (IS_ERR(task))
1283                 return PTR_ERR(task);
1284         rpc_put_task(task);
1285         return 0;
1286 }
1287 EXPORT_SYMBOL_GPL(rpc_call_async);
1288
1289 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1290 static void call_bc_encode(struct rpc_task *task);
1291
1292 /**
1293  * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
1294  * rpc_execute against it
1295  * @req: RPC request
1296  */
1297 struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
1298 {
1299         struct rpc_task *task;
1300         struct rpc_task_setup task_setup_data = {
1301                 .callback_ops = &rpc_default_ops,
1302                 .flags = RPC_TASK_SOFTCONN |
1303                         RPC_TASK_NO_RETRANS_TIMEOUT,
1304         };
1305
1306         dprintk("RPC: rpc_run_bc_task req= %p\n", req);
1307         /*
1308          * Create an rpc_task to send the data
1309          */
1310         task = rpc_new_task(&task_setup_data);
1311         if (IS_ERR(task)) {
1312                 xprt_free_bc_request(req);
1313                 return task;
1314         }
1315
1316         xprt_init_bc_request(req, task);
1317
1318         task->tk_action = call_bc_encode;
1319         atomic_inc(&task->tk_count);
1320         WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
1321         rpc_execute(task);
1322
1323         dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
1324         return task;
1325 }
1326 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1327
1328 /**
1329  * rpc_prepare_reply_pages - Prepare to receive a reply data payload into pages
1330  * @req: RPC request to prepare
1331  * @pages: vector of struct page pointers
1332  * @base: offset in first page where receive should start, in bytes
1333  * @len: expected size of the upper layer data payload, in bytes
1334  * @hdrsize: expected size of upper layer reply header, in XDR words
1335  *
1336  */
1337 void rpc_prepare_reply_pages(struct rpc_rqst *req, struct page **pages,
1338                              unsigned int base, unsigned int len,
1339                              unsigned int hdrsize)
1340 {
1341         hdrsize += RPC_REPHDRSIZE + req->rq_cred->cr_auth->au_ralign;
1342
1343         xdr_inline_pages(&req->rq_rcv_buf, hdrsize << 2, pages, base, len);
1344         trace_rpc_xdr_reply_pages(req->rq_task, &req->rq_rcv_buf);
1345 }
1346 EXPORT_SYMBOL_GPL(rpc_prepare_reply_pages);
1347
1348 void
1349 rpc_call_start(struct rpc_task *task)
1350 {
1351         task->tk_action = call_start;
1352 }
1353 EXPORT_SYMBOL_GPL(rpc_call_start);
1354
1355 /**
1356  * rpc_peeraddr - extract remote peer address from clnt's xprt
1357  * @clnt: RPC client structure
1358  * @buf: target buffer
1359  * @bufsize: length of target buffer
1360  *
1361  * Returns the number of bytes that are actually in the stored address.
1362  */
1363 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
1364 {
1365         size_t bytes;
1366         struct rpc_xprt *xprt;
1367
1368         rcu_read_lock();
1369         xprt = rcu_dereference(clnt->cl_xprt);
1370
1371         bytes = xprt->addrlen;
1372         if (bytes > bufsize)
1373                 bytes = bufsize;
1374         memcpy(buf, &xprt->addr, bytes);
1375         rcu_read_unlock();
1376
1377         return bytes;
1378 }
1379 EXPORT_SYMBOL_GPL(rpc_peeraddr);
1380
1381 /**
1382  * rpc_peeraddr2str - return remote peer address in printable format
1383  * @clnt: RPC client structure
1384  * @format: address format
1385  *
1386  * NB: the lifetime of the memory referenced by the returned pointer is
1387  * the same as the rpc_xprt itself.  As long as the caller uses this
1388  * pointer, it must hold the RCU read lock.
1389  */
1390 const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
1391                              enum rpc_display_format_t format)
1392 {
1393         struct rpc_xprt *xprt;
1394
1395         xprt = rcu_dereference(clnt->cl_xprt);
1396
1397         if (xprt->address_strings[format] != NULL)
1398                 return xprt->address_strings[format];
1399         else
1400                 return "unprintable";
1401 }
1402 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1403
1404 static const struct sockaddr_in rpc_inaddr_loopback = {
1405         .sin_family             = AF_INET,
1406         .sin_addr.s_addr        = htonl(INADDR_ANY),
1407 };
1408
1409 static const struct sockaddr_in6 rpc_in6addr_loopback = {
1410         .sin6_family            = AF_INET6,
1411         .sin6_addr              = IN6ADDR_ANY_INIT,
1412 };
1413
1414 /*
1415  * Try a getsockname() on a connected datagram socket.  Using a
1416  * connected datagram socket prevents leaving a socket in TIME_WAIT.
1417  * This conserves the ephemeral port number space.
1418  *
1419  * Returns zero and fills in "buf" if successful; otherwise, a
1420  * negative errno is returned.
1421  */
1422 static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1423                         struct sockaddr *buf)
1424 {
1425         struct socket *sock;
1426         int err;
1427
1428         err = __sock_create(net, sap->sa_family,
1429                                 SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1430         if (err < 0) {
1431                 dprintk("RPC:       can't create UDP socket (%d)\n", err);
1432                 goto out;
1433         }
1434
1435         switch (sap->sa_family) {
1436         case AF_INET:
1437                 err = kernel_bind(sock,
1438                                 (struct sockaddr *)&rpc_inaddr_loopback,
1439                                 sizeof(rpc_inaddr_loopback));
1440                 break;
1441         case AF_INET6:
1442                 err = kernel_bind(sock,
1443                                 (struct sockaddr *)&rpc_in6addr_loopback,
1444                                 sizeof(rpc_in6addr_loopback));
1445                 break;
1446         default:
1447                 err = -EAFNOSUPPORT;
1448                 goto out_release;
1449         }
1450         if (err < 0) {
1451                 dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1452                 goto out_release;
1453         }
1454
1455         err = kernel_connect(sock, sap, salen, 0);
1456         if (err < 0) {
1457                 dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1458                 goto out_release;
1459         }
1460
1461         err = kernel_getsockname(sock, buf);
1462         if (err < 0) {
1463                 dprintk("RPC:       getsockname failed (%d)\n", err);
1464                 goto out_release;
1465         }
1466
1467         err = 0;
1468         if (buf->sa_family == AF_INET6) {
1469                 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1470                 sin6->sin6_scope_id = 0;
1471         }
1472         dprintk("RPC:       %s succeeded\n", __func__);
1473
1474 out_release:
1475         sock_release(sock);
1476 out:
1477         return err;
1478 }
1479
1480 /*
1481  * Scraping a connected socket failed, so we don't have a useable
1482  * local address.  Fallback: generate an address that will prevent
1483  * the server from calling us back.
1484  *
1485  * Returns zero and fills in "buf" if successful; otherwise, a
1486  * negative errno is returned.
1487  */
1488 static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1489 {
1490         switch (family) {
1491         case AF_INET:
1492                 if (buflen < sizeof(rpc_inaddr_loopback))
1493                         return -EINVAL;
1494                 memcpy(buf, &rpc_inaddr_loopback,
1495                                 sizeof(rpc_inaddr_loopback));
1496                 break;
1497         case AF_INET6:
1498                 if (buflen < sizeof(rpc_in6addr_loopback))
1499                         return -EINVAL;
1500                 memcpy(buf, &rpc_in6addr_loopback,
1501                                 sizeof(rpc_in6addr_loopback));
1502                 break;
1503         default:
1504                 dprintk("RPC:       %s: address family not supported\n",
1505                         __func__);
1506                 return -EAFNOSUPPORT;
1507         }
1508         dprintk("RPC:       %s: succeeded\n", __func__);
1509         return 0;
1510 }
1511
1512 /**
1513  * rpc_localaddr - discover local endpoint address for an RPC client
1514  * @clnt: RPC client structure
1515  * @buf: target buffer
1516  * @buflen: size of target buffer, in bytes
1517  *
1518  * Returns zero and fills in "buf" and "buflen" if successful;
1519  * otherwise, a negative errno is returned.
1520  *
1521  * This works even if the underlying transport is not currently connected,
1522  * or if the upper layer never previously provided a source address.
1523  *
1524  * The result of this function call is transient: multiple calls in
1525  * succession may give different results, depending on how local
1526  * networking configuration changes over time.
1527  */
1528 int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1529 {
1530         struct sockaddr_storage address;
1531         struct sockaddr *sap = (struct sockaddr *)&address;
1532         struct rpc_xprt *xprt;
1533         struct net *net;
1534         size_t salen;
1535         int err;
1536
1537         rcu_read_lock();
1538         xprt = rcu_dereference(clnt->cl_xprt);
1539         salen = xprt->addrlen;
1540         memcpy(sap, &xprt->addr, salen);
1541         net = get_net(xprt->xprt_net);
1542         rcu_read_unlock();
1543
1544         rpc_set_port(sap, 0);
1545         err = rpc_sockname(net, sap, salen, buf);
1546         put_net(net);
1547         if (err != 0)
1548                 /* Couldn't discover local address, return ANYADDR */
1549                 return rpc_anyaddr(sap->sa_family, buf, buflen);
1550         return 0;
1551 }
1552 EXPORT_SYMBOL_GPL(rpc_localaddr);
1553
1554 void
1555 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1556 {
1557         struct rpc_xprt *xprt;
1558
1559         rcu_read_lock();
1560         xprt = rcu_dereference(clnt->cl_xprt);
1561         if (xprt->ops->set_buffer_size)
1562                 xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1563         rcu_read_unlock();
1564 }
1565 EXPORT_SYMBOL_GPL(rpc_setbufsize);
1566
1567 /**
1568  * rpc_net_ns - Get the network namespace for this RPC client
1569  * @clnt: RPC client to query
1570  *
1571  */
1572 struct net *rpc_net_ns(struct rpc_clnt *clnt)
1573 {
1574         struct net *ret;
1575
1576         rcu_read_lock();
1577         ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1578         rcu_read_unlock();
1579         return ret;
1580 }
1581 EXPORT_SYMBOL_GPL(rpc_net_ns);
1582
1583 /**
1584  * rpc_max_payload - Get maximum payload size for a transport, in bytes
1585  * @clnt: RPC client to query
1586  *
1587  * For stream transports, this is one RPC record fragment (see RFC
1588  * 1831), as we don't support multi-record requests yet.  For datagram
1589  * transports, this is the size of an IP packet minus the IP, UDP, and
1590  * RPC header sizes.
1591  */
1592 size_t rpc_max_payload(struct rpc_clnt *clnt)
1593 {
1594         size_t ret;
1595
1596         rcu_read_lock();
1597         ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1598         rcu_read_unlock();
1599         return ret;
1600 }
1601 EXPORT_SYMBOL_GPL(rpc_max_payload);
1602
1603 /**
1604  * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes
1605  * @clnt: RPC client to query
1606  */
1607 size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
1608 {
1609         struct rpc_xprt *xprt;
1610         size_t ret;
1611
1612         rcu_read_lock();
1613         xprt = rcu_dereference(clnt->cl_xprt);
1614         ret = xprt->ops->bc_maxpayload(xprt);
1615         rcu_read_unlock();
1616         return ret;
1617 }
1618 EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
1619
1620 unsigned int rpc_num_bc_slots(struct rpc_clnt *clnt)
1621 {
1622         struct rpc_xprt *xprt;
1623         unsigned int ret;
1624
1625         rcu_read_lock();
1626         xprt = rcu_dereference(clnt->cl_xprt);
1627         ret = xprt->ops->bc_num_slots(xprt);
1628         rcu_read_unlock();
1629         return ret;
1630 }
1631 EXPORT_SYMBOL_GPL(rpc_num_bc_slots);
1632
1633 /**
1634  * rpc_force_rebind - force transport to check that remote port is unchanged
1635  * @clnt: client to rebind
1636  *
1637  */
1638 void rpc_force_rebind(struct rpc_clnt *clnt)
1639 {
1640         if (clnt->cl_autobind) {
1641                 rcu_read_lock();
1642                 xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1643                 rcu_read_unlock();
1644         }
1645 }
1646 EXPORT_SYMBOL_GPL(rpc_force_rebind);
1647
1648 static int
1649 __rpc_restart_call(struct rpc_task *task, void (*action)(struct rpc_task *))
1650 {
1651         task->tk_status = 0;
1652         task->tk_rpc_status = 0;
1653         task->tk_action = action;
1654         return 1;
1655 }
1656
1657 /*
1658  * Restart an (async) RPC call. Usually called from within the
1659  * exit handler.
1660  */
1661 int
1662 rpc_restart_call(struct rpc_task *task)
1663 {
1664         return __rpc_restart_call(task, call_start);
1665 }
1666 EXPORT_SYMBOL_GPL(rpc_restart_call);
1667
1668 /*
1669  * Restart an (async) RPC call from the call_prepare state.
1670  * Usually called from within the exit handler.
1671  */
1672 int
1673 rpc_restart_call_prepare(struct rpc_task *task)
1674 {
1675         if (task->tk_ops->rpc_call_prepare != NULL)
1676                 return __rpc_restart_call(task, rpc_prepare_task);
1677         return rpc_restart_call(task);
1678 }
1679 EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1680
1681 const char
1682 *rpc_proc_name(const struct rpc_task *task)
1683 {
1684         const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1685
1686         if (proc) {
1687                 if (proc->p_name)
1688                         return proc->p_name;
1689                 else
1690                         return "NULL";
1691         } else
1692                 return "no proc";
1693 }
1694
1695 static void
1696 __rpc_call_rpcerror(struct rpc_task *task, int tk_status, int rpc_status)
1697 {
1698         trace_rpc_call_rpcerror(task, tk_status, rpc_status);
1699         rpc_task_set_rpc_status(task, rpc_status);
1700         rpc_exit(task, tk_status);
1701 }
1702
1703 static void
1704 rpc_call_rpcerror(struct rpc_task *task, int status)
1705 {
1706         __rpc_call_rpcerror(task, status, status);
1707 }
1708
1709 /*
1710  * 0.  Initial state
1711  *
1712  *     Other FSM states can be visited zero or more times, but
1713  *     this state is visited exactly once for each RPC.
1714  */
1715 static void
1716 call_start(struct rpc_task *task)
1717 {
1718         struct rpc_clnt *clnt = task->tk_client;
1719         int idx = task->tk_msg.rpc_proc->p_statidx;
1720
1721         trace_rpc_request(task);
1722
1723         /* Increment call count (version might not be valid for ping) */
1724         if (clnt->cl_program->version[clnt->cl_vers])
1725                 clnt->cl_program->version[clnt->cl_vers]->counts[idx]++;
1726         clnt->cl_stats->rpccnt++;
1727         task->tk_action = call_reserve;
1728         rpc_task_set_transport(task, clnt);
1729 }
1730
1731 /*
1732  * 1.   Reserve an RPC call slot
1733  */
1734 static void
1735 call_reserve(struct rpc_task *task)
1736 {
1737         task->tk_status  = 0;
1738         task->tk_action  = call_reserveresult;
1739         xprt_reserve(task);
1740 }
1741
1742 static void call_retry_reserve(struct rpc_task *task);
1743
1744 /*
1745  * 1b.  Grok the result of xprt_reserve()
1746  */
1747 static void
1748 call_reserveresult(struct rpc_task *task)
1749 {
1750         int status = task->tk_status;
1751
1752         /*
1753          * After a call to xprt_reserve(), we must have either
1754          * a request slot or else an error status.
1755          */
1756         task->tk_status = 0;
1757         if (status >= 0) {
1758                 if (task->tk_rqstp) {
1759                         task->tk_action = call_refresh;
1760                         return;
1761                 }
1762
1763                 rpc_call_rpcerror(task, -EIO);
1764                 return;
1765         }
1766
1767         switch (status) {
1768         case -ENOMEM:
1769                 rpc_delay(task, HZ >> 2);
1770                 fallthrough;
1771         case -EAGAIN:   /* woken up; retry */
1772                 task->tk_action = call_retry_reserve;
1773                 return;
1774         default:
1775                 rpc_call_rpcerror(task, status);
1776         }
1777 }
1778
1779 /*
1780  * 1c.  Retry reserving an RPC call slot
1781  */
1782 static void
1783 call_retry_reserve(struct rpc_task *task)
1784 {
1785         task->tk_status  = 0;
1786         task->tk_action  = call_reserveresult;
1787         xprt_retry_reserve(task);
1788 }
1789
1790 /*
1791  * 2.   Bind and/or refresh the credentials
1792  */
1793 static void
1794 call_refresh(struct rpc_task *task)
1795 {
1796         task->tk_action = call_refreshresult;
1797         task->tk_status = 0;
1798         task->tk_client->cl_stats->rpcauthrefresh++;
1799         rpcauth_refreshcred(task);
1800 }
1801
1802 /*
1803  * 2a.  Process the results of a credential refresh
1804  */
1805 static void
1806 call_refreshresult(struct rpc_task *task)
1807 {
1808         int status = task->tk_status;
1809
1810         task->tk_status = 0;
1811         task->tk_action = call_refresh;
1812         switch (status) {
1813         case 0:
1814                 if (rpcauth_uptodatecred(task)) {
1815                         task->tk_action = call_allocate;
1816                         return;
1817                 }
1818                 /* Use rate-limiting and a max number of retries if refresh
1819                  * had status 0 but failed to update the cred.
1820                  */
1821                 fallthrough;
1822         case -ETIMEDOUT:
1823                 rpc_delay(task, 3*HZ);
1824                 fallthrough;
1825         case -EAGAIN:
1826                 status = -EACCES;
1827                 fallthrough;
1828         case -EKEYEXPIRED:
1829                 if (!task->tk_cred_retry)
1830                         break;
1831                 task->tk_cred_retry--;
1832                 trace_rpc_retry_refresh_status(task);
1833                 return;
1834         case -ENOMEM:
1835                 rpc_delay(task, HZ >> 4);
1836                 return;
1837         }
1838         trace_rpc_refresh_status(task);
1839         rpc_call_rpcerror(task, status);
1840 }
1841
1842 /*
1843  * 2b.  Allocate the buffer. For details, see sched.c:rpc_malloc.
1844  *      (Note: buffer memory is freed in xprt_release).
1845  */
1846 static void
1847 call_allocate(struct rpc_task *task)
1848 {
1849         const struct rpc_auth *auth = task->tk_rqstp->rq_cred->cr_auth;
1850         struct rpc_rqst *req = task->tk_rqstp;
1851         struct rpc_xprt *xprt = req->rq_xprt;
1852         const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1853         int status;
1854
1855         task->tk_status = 0;
1856         task->tk_action = call_encode;
1857
1858         if (req->rq_buffer)
1859                 return;
1860
1861         if (proc->p_proc != 0) {
1862                 BUG_ON(proc->p_arglen == 0);
1863                 if (proc->p_decode != NULL)
1864                         BUG_ON(proc->p_replen == 0);
1865         }
1866
1867         /*
1868          * Calculate the size (in quads) of the RPC call
1869          * and reply headers, and convert both values
1870          * to byte sizes.
1871          */
1872         req->rq_callsize = RPC_CALLHDRSIZE + (auth->au_cslack << 1) +
1873                            proc->p_arglen;
1874         req->rq_callsize <<= 2;
1875         /*
1876          * Note: the reply buffer must at minimum allocate enough space
1877          * for the 'struct accepted_reply' from RFC5531.
1878          */
1879         req->rq_rcvsize = RPC_REPHDRSIZE + auth->au_rslack + \
1880                         max_t(size_t, proc->p_replen, 2);
1881         req->rq_rcvsize <<= 2;
1882
1883         status = xprt->ops->buf_alloc(task);
1884         trace_rpc_buf_alloc(task, status);
1885         if (status == 0)
1886                 return;
1887         if (status != -ENOMEM) {
1888                 rpc_call_rpcerror(task, status);
1889                 return;
1890         }
1891
1892         if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1893                 task->tk_action = call_allocate;
1894                 rpc_delay(task, HZ>>4);
1895                 return;
1896         }
1897
1898         rpc_call_rpcerror(task, -ERESTARTSYS);
1899 }
1900
1901 static int
1902 rpc_task_need_encode(struct rpc_task *task)
1903 {
1904         return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0 &&
1905                 (!(task->tk_flags & RPC_TASK_SENT) ||
1906                  !(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) ||
1907                  xprt_request_need_retransmit(task));
1908 }
1909
1910 static void
1911 rpc_xdr_encode(struct rpc_task *task)
1912 {
1913         struct rpc_rqst *req = task->tk_rqstp;
1914         struct xdr_stream xdr;
1915
1916         xdr_buf_init(&req->rq_snd_buf,
1917                      req->rq_buffer,
1918                      req->rq_callsize);
1919         xdr_buf_init(&req->rq_rcv_buf,
1920                      req->rq_rbuffer,
1921                      req->rq_rcvsize);
1922
1923         req->rq_reply_bytes_recvd = 0;
1924         req->rq_snd_buf.head[0].iov_len = 0;
1925         xdr_init_encode(&xdr, &req->rq_snd_buf,
1926                         req->rq_snd_buf.head[0].iov_base, req);
1927         if (rpc_encode_header(task, &xdr))
1928                 return;
1929
1930         task->tk_status = rpcauth_wrap_req(task, &xdr);
1931 }
1932
1933 /*
1934  * 3.   Encode arguments of an RPC call
1935  */
1936 static void
1937 call_encode(struct rpc_task *task)
1938 {
1939         if (!rpc_task_need_encode(task))
1940                 goto out;
1941
1942         /* Dequeue task from the receive queue while we're encoding */
1943         xprt_request_dequeue_xprt(task);
1944         /* Encode here so that rpcsec_gss can use correct sequence number. */
1945         rpc_xdr_encode(task);
1946         /* Add task to reply queue before transmission to avoid races */
1947         if (task->tk_status == 0 && rpc_reply_expected(task))
1948                 task->tk_status = xprt_request_enqueue_receive(task);
1949         /* Did the encode result in an error condition? */
1950         if (task->tk_status != 0) {
1951                 /* Was the error nonfatal? */
1952                 switch (task->tk_status) {
1953                 case -EAGAIN:
1954                 case -ENOMEM:
1955                         rpc_delay(task, HZ >> 4);
1956                         break;
1957                 case -EKEYEXPIRED:
1958                         if (!task->tk_cred_retry) {
1959                                 rpc_call_rpcerror(task, task->tk_status);
1960                         } else {
1961                                 task->tk_action = call_refresh;
1962                                 task->tk_cred_retry--;
1963                                 trace_rpc_retry_refresh_status(task);
1964                         }
1965                         break;
1966                 default:
1967                         rpc_call_rpcerror(task, task->tk_status);
1968                 }
1969                 return;
1970         }
1971
1972         xprt_request_enqueue_transmit(task);
1973 out:
1974         task->tk_action = call_transmit;
1975         /* Check that the connection is OK */
1976         if (!xprt_bound(task->tk_xprt))
1977                 task->tk_action = call_bind;
1978         else if (!xprt_connected(task->tk_xprt))
1979                 task->tk_action = call_connect;
1980 }
1981
1982 /*
1983  * Helpers to check if the task was already transmitted, and
1984  * to take action when that is the case.
1985  */
1986 static bool
1987 rpc_task_transmitted(struct rpc_task *task)
1988 {
1989         return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1990 }
1991
1992 static void
1993 rpc_task_handle_transmitted(struct rpc_task *task)
1994 {
1995         xprt_end_transmit(task);
1996         task->tk_action = call_transmit_status;
1997 }
1998
1999 /*
2000  * 4.   Get the server port number if not yet set
2001  */
2002 static void
2003 call_bind(struct rpc_task *task)
2004 {
2005         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2006
2007         if (rpc_task_transmitted(task)) {
2008                 rpc_task_handle_transmitted(task);
2009                 return;
2010         }
2011
2012         if (xprt_bound(xprt)) {
2013                 task->tk_action = call_connect;
2014                 return;
2015         }
2016
2017         task->tk_action = call_bind_status;
2018         if (!xprt_prepare_transmit(task))
2019                 return;
2020
2021         xprt->ops->rpcbind(task);
2022 }
2023
2024 /*
2025  * 4a.  Sort out bind result
2026  */
2027 static void
2028 call_bind_status(struct rpc_task *task)
2029 {
2030         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2031         int status = -EIO;
2032
2033         if (rpc_task_transmitted(task)) {
2034                 rpc_task_handle_transmitted(task);
2035                 return;
2036         }
2037
2038         if (task->tk_status >= 0)
2039                 goto out_next;
2040         if (xprt_bound(xprt)) {
2041                 task->tk_status = 0;
2042                 goto out_next;
2043         }
2044
2045         switch (task->tk_status) {
2046         case -ENOMEM:
2047                 rpc_delay(task, HZ >> 2);
2048                 goto retry_timeout;
2049         case -EACCES:
2050                 trace_rpcb_prog_unavail_err(task);
2051                 /* fail immediately if this is an RPC ping */
2052                 if (task->tk_msg.rpc_proc->p_proc == 0) {
2053                         status = -EOPNOTSUPP;
2054                         break;
2055                 }
2056                 rpc_delay(task, 3*HZ);
2057                 goto retry_timeout;
2058         case -ENOBUFS:
2059                 rpc_delay(task, HZ >> 2);
2060                 goto retry_timeout;
2061         case -EAGAIN:
2062                 goto retry_timeout;
2063         case -ETIMEDOUT:
2064                 trace_rpcb_timeout_err(task);
2065                 goto retry_timeout;
2066         case -EPFNOSUPPORT:
2067                 /* server doesn't support any rpcbind version we know of */
2068                 trace_rpcb_bind_version_err(task);
2069                 break;
2070         case -EPROTONOSUPPORT:
2071                 trace_rpcb_bind_version_err(task);
2072                 goto retry_timeout;
2073         case -ECONNREFUSED:             /* connection problems */
2074         case -ECONNRESET:
2075         case -ECONNABORTED:
2076         case -ENOTCONN:
2077         case -EHOSTDOWN:
2078         case -ENETDOWN:
2079         case -EHOSTUNREACH:
2080         case -ENETUNREACH:
2081         case -EPIPE:
2082                 trace_rpcb_unreachable_err(task);
2083                 if (!RPC_IS_SOFTCONN(task)) {
2084                         rpc_delay(task, 5*HZ);
2085                         goto retry_timeout;
2086                 }
2087                 status = task->tk_status;
2088                 break;
2089         default:
2090                 trace_rpcb_unrecognized_err(task);
2091         }
2092
2093         rpc_call_rpcerror(task, status);
2094         return;
2095 out_next:
2096         task->tk_action = call_connect;
2097         return;
2098 retry_timeout:
2099         task->tk_status = 0;
2100         task->tk_action = call_bind;
2101         rpc_check_timeout(task);
2102 }
2103
2104 /*
2105  * 4b.  Connect to the RPC server
2106  */
2107 static void
2108 call_connect(struct rpc_task *task)
2109 {
2110         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2111
2112         if (rpc_task_transmitted(task)) {
2113                 rpc_task_handle_transmitted(task);
2114                 return;
2115         }
2116
2117         if (xprt_connected(xprt)) {
2118                 task->tk_action = call_transmit;
2119                 return;
2120         }
2121
2122         task->tk_action = call_connect_status;
2123         if (task->tk_status < 0)
2124                 return;
2125         if (task->tk_flags & RPC_TASK_NOCONNECT) {
2126                 rpc_call_rpcerror(task, -ENOTCONN);
2127                 return;
2128         }
2129         if (!xprt_prepare_transmit(task))
2130                 return;
2131         xprt_connect(task);
2132 }
2133
2134 /*
2135  * 4c.  Sort out connect result
2136  */
2137 static void
2138 call_connect_status(struct rpc_task *task)
2139 {
2140         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2141         struct rpc_clnt *clnt = task->tk_client;
2142         int status = task->tk_status;
2143
2144         if (rpc_task_transmitted(task)) {
2145                 rpc_task_handle_transmitted(task);
2146                 return;
2147         }
2148
2149         trace_rpc_connect_status(task);
2150
2151         if (task->tk_status == 0) {
2152                 clnt->cl_stats->netreconn++;
2153                 goto out_next;
2154         }
2155         if (xprt_connected(xprt)) {
2156                 task->tk_status = 0;
2157                 goto out_next;
2158         }
2159
2160         task->tk_status = 0;
2161         switch (status) {
2162         case -ECONNREFUSED:
2163         case -ECONNRESET:
2164                 /* A positive refusal suggests a rebind is needed. */
2165                 if (RPC_IS_SOFTCONN(task))
2166                         break;
2167                 if (clnt->cl_autobind) {
2168                         rpc_force_rebind(clnt);
2169                         goto out_retry;
2170                 }
2171                 fallthrough;
2172         case -ECONNABORTED:
2173         case -ENETDOWN:
2174         case -ENETUNREACH:
2175         case -EHOSTUNREACH:
2176         case -EPIPE:
2177         case -EPROTO:
2178                 xprt_conditional_disconnect(task->tk_rqstp->rq_xprt,
2179                                             task->tk_rqstp->rq_connect_cookie);
2180                 if (RPC_IS_SOFTCONN(task))
2181                         break;
2182                 /* retry with existing socket, after a delay */
2183                 rpc_delay(task, 3*HZ);
2184                 fallthrough;
2185         case -EADDRINUSE:
2186         case -ENOTCONN:
2187         case -EAGAIN:
2188         case -ETIMEDOUT:
2189                 if (!(task->tk_flags & RPC_TASK_NO_ROUND_ROBIN) &&
2190                     (task->tk_flags & RPC_TASK_MOVEABLE) &&
2191                     test_bit(XPRT_REMOVE, &xprt->state)) {
2192                         struct rpc_xprt *saved = task->tk_xprt;
2193                         struct rpc_xprt_switch *xps;
2194
2195                         rcu_read_lock();
2196                         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2197                         rcu_read_unlock();
2198                         if (xps->xps_nxprts > 1) {
2199                                 long value;
2200
2201                                 xprt_release(task);
2202                                 value = atomic_long_dec_return(&xprt->queuelen);
2203                                 if (value == 0)
2204                                         rpc_xprt_switch_remove_xprt(xps, saved,
2205                                                                     true);
2206                                 xprt_put(saved);
2207                                 task->tk_xprt = NULL;
2208                                 task->tk_action = call_start;
2209                         }
2210                         xprt_switch_put(xps);
2211                         if (!task->tk_xprt)
2212                                 return;
2213                 }
2214                 goto out_retry;
2215         case -ENOBUFS:
2216                 rpc_delay(task, HZ >> 2);
2217                 goto out_retry;
2218         }
2219         rpc_call_rpcerror(task, status);
2220         return;
2221 out_next:
2222         task->tk_action = call_transmit;
2223         return;
2224 out_retry:
2225         /* Check for timeouts before looping back to call_bind */
2226         task->tk_action = call_bind;
2227         rpc_check_timeout(task);
2228 }
2229
2230 /*
2231  * 5.   Transmit the RPC request, and wait for reply
2232  */
2233 static void
2234 call_transmit(struct rpc_task *task)
2235 {
2236         if (rpc_task_transmitted(task)) {
2237                 rpc_task_handle_transmitted(task);
2238                 return;
2239         }
2240
2241         task->tk_action = call_transmit_status;
2242         if (!xprt_prepare_transmit(task))
2243                 return;
2244         task->tk_status = 0;
2245         if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2246                 if (!xprt_connected(task->tk_xprt)) {
2247                         task->tk_status = -ENOTCONN;
2248                         return;
2249                 }
2250                 xprt_transmit(task);
2251         }
2252         xprt_end_transmit(task);
2253 }
2254
2255 /*
2256  * 5a.  Handle cleanup after a transmission
2257  */
2258 static void
2259 call_transmit_status(struct rpc_task *task)
2260 {
2261         task->tk_action = call_status;
2262
2263         /*
2264          * Common case: success.  Force the compiler to put this
2265          * test first.
2266          */
2267         if (rpc_task_transmitted(task)) {
2268                 task->tk_status = 0;
2269                 xprt_request_wait_receive(task);
2270                 return;
2271         }
2272
2273         switch (task->tk_status) {
2274         default:
2275                 break;
2276         case -EBADMSG:
2277                 task->tk_status = 0;
2278                 task->tk_action = call_encode;
2279                 break;
2280                 /*
2281                  * Special cases: if we've been waiting on the
2282                  * socket's write_space() callback, or if the
2283                  * socket just returned a connection error,
2284                  * then hold onto the transport lock.
2285                  */
2286         case -ENOMEM:
2287         case -ENOBUFS:
2288                 rpc_delay(task, HZ>>2);
2289                 fallthrough;
2290         case -EBADSLT:
2291         case -EAGAIN:
2292                 task->tk_action = call_transmit;
2293                 task->tk_status = 0;
2294                 break;
2295         case -ECONNREFUSED:
2296         case -EHOSTDOWN:
2297         case -ENETDOWN:
2298         case -EHOSTUNREACH:
2299         case -ENETUNREACH:
2300         case -EPERM:
2301                 if (RPC_IS_SOFTCONN(task)) {
2302                         if (!task->tk_msg.rpc_proc->p_proc)
2303                                 trace_xprt_ping(task->tk_xprt,
2304                                                 task->tk_status);
2305                         rpc_call_rpcerror(task, task->tk_status);
2306                         return;
2307                 }
2308                 fallthrough;
2309         case -ECONNRESET:
2310         case -ECONNABORTED:
2311         case -EADDRINUSE:
2312         case -ENOTCONN:
2313         case -EPIPE:
2314                 task->tk_action = call_bind;
2315                 task->tk_status = 0;
2316                 break;
2317         }
2318         rpc_check_timeout(task);
2319 }
2320
2321 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
2322 static void call_bc_transmit(struct rpc_task *task);
2323 static void call_bc_transmit_status(struct rpc_task *task);
2324
2325 static void
2326 call_bc_encode(struct rpc_task *task)
2327 {
2328         xprt_request_enqueue_transmit(task);
2329         task->tk_action = call_bc_transmit;
2330 }
2331
2332 /*
2333  * 5b.  Send the backchannel RPC reply.  On error, drop the reply.  In
2334  * addition, disconnect on connectivity errors.
2335  */
2336 static void
2337 call_bc_transmit(struct rpc_task *task)
2338 {
2339         task->tk_action = call_bc_transmit_status;
2340         if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2341                 if (!xprt_prepare_transmit(task))
2342                         return;
2343                 task->tk_status = 0;
2344                 xprt_transmit(task);
2345         }
2346         xprt_end_transmit(task);
2347 }
2348
2349 static void
2350 call_bc_transmit_status(struct rpc_task *task)
2351 {
2352         struct rpc_rqst *req = task->tk_rqstp;
2353
2354         if (rpc_task_transmitted(task))
2355                 task->tk_status = 0;
2356
2357         switch (task->tk_status) {
2358         case 0:
2359                 /* Success */
2360         case -ENETDOWN:
2361         case -EHOSTDOWN:
2362         case -EHOSTUNREACH:
2363         case -ENETUNREACH:
2364         case -ECONNRESET:
2365         case -ECONNREFUSED:
2366         case -EADDRINUSE:
2367         case -ENOTCONN:
2368         case -EPIPE:
2369                 break;
2370         case -ENOMEM:
2371         case -ENOBUFS:
2372                 rpc_delay(task, HZ>>2);
2373                 fallthrough;
2374         case -EBADSLT:
2375         case -EAGAIN:
2376                 task->tk_status = 0;
2377                 task->tk_action = call_bc_transmit;
2378                 return;
2379         case -ETIMEDOUT:
2380                 /*
2381                  * Problem reaching the server.  Disconnect and let the
2382                  * forechannel reestablish the connection.  The server will
2383                  * have to retransmit the backchannel request and we'll
2384                  * reprocess it.  Since these ops are idempotent, there's no
2385                  * need to cache our reply at this time.
2386                  */
2387                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2388                         "error: %d\n", task->tk_status);
2389                 xprt_conditional_disconnect(req->rq_xprt,
2390                         req->rq_connect_cookie);
2391                 break;
2392         default:
2393                 /*
2394                  * We were unable to reply and will have to drop the
2395                  * request.  The server should reconnect and retransmit.
2396                  */
2397                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2398                         "error: %d\n", task->tk_status);
2399                 break;
2400         }
2401         task->tk_action = rpc_exit_task;
2402 }
2403 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
2404
2405 /*
2406  * 6.   Sort out the RPC call status
2407  */
2408 static void
2409 call_status(struct rpc_task *task)
2410 {
2411         struct rpc_clnt *clnt = task->tk_client;
2412         int             status;
2413
2414         if (!task->tk_msg.rpc_proc->p_proc)
2415                 trace_xprt_ping(task->tk_xprt, task->tk_status);
2416
2417         status = task->tk_status;
2418         if (status >= 0) {
2419                 task->tk_action = call_decode;
2420                 return;
2421         }
2422
2423         trace_rpc_call_status(task);
2424         task->tk_status = 0;
2425         switch(status) {
2426         case -EHOSTDOWN:
2427         case -ENETDOWN:
2428         case -EHOSTUNREACH:
2429         case -ENETUNREACH:
2430         case -EPERM:
2431                 if (RPC_IS_SOFTCONN(task))
2432                         goto out_exit;
2433                 /*
2434                  * Delay any retries for 3 seconds, then handle as if it
2435                  * were a timeout.
2436                  */
2437                 rpc_delay(task, 3*HZ);
2438                 fallthrough;
2439         case -ETIMEDOUT:
2440                 break;
2441         case -ECONNREFUSED:
2442         case -ECONNRESET:
2443         case -ECONNABORTED:
2444         case -ENOTCONN:
2445                 rpc_force_rebind(clnt);
2446                 break;
2447         case -EADDRINUSE:
2448                 rpc_delay(task, 3*HZ);
2449                 fallthrough;
2450         case -EPIPE:
2451         case -EAGAIN:
2452                 break;
2453         case -ENFILE:
2454         case -ENOBUFS:
2455         case -ENOMEM:
2456                 rpc_delay(task, HZ>>2);
2457                 break;
2458         case -EIO:
2459                 /* shutdown or soft timeout */
2460                 goto out_exit;
2461         default:
2462                 if (clnt->cl_chatty)
2463                         printk("%s: RPC call returned error %d\n",
2464                                clnt->cl_program->name, -status);
2465                 goto out_exit;
2466         }
2467         task->tk_action = call_encode;
2468         rpc_check_timeout(task);
2469         return;
2470 out_exit:
2471         rpc_call_rpcerror(task, status);
2472 }
2473
2474 static bool
2475 rpc_check_connected(const struct rpc_rqst *req)
2476 {
2477         /* No allocated request or transport? return true */
2478         if (!req || !req->rq_xprt)
2479                 return true;
2480         return xprt_connected(req->rq_xprt);
2481 }
2482
2483 static void
2484 rpc_check_timeout(struct rpc_task *task)
2485 {
2486         struct rpc_clnt *clnt = task->tk_client;
2487
2488         if (RPC_SIGNALLED(task))
2489                 return;
2490
2491         if (xprt_adjust_timeout(task->tk_rqstp) == 0)
2492                 return;
2493
2494         trace_rpc_timeout_status(task);
2495         task->tk_timeouts++;
2496
2497         if (RPC_IS_SOFTCONN(task) && !rpc_check_connected(task->tk_rqstp)) {
2498                 rpc_call_rpcerror(task, -ETIMEDOUT);
2499                 return;
2500         }
2501
2502         if (RPC_IS_SOFT(task)) {
2503                 /*
2504                  * Once a "no retrans timeout" soft tasks (a.k.a NFSv4) has
2505                  * been sent, it should time out only if the transport
2506                  * connection gets terminally broken.
2507                  */
2508                 if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) &&
2509                     rpc_check_connected(task->tk_rqstp))
2510                         return;
2511
2512                 if (clnt->cl_chatty) {
2513                         pr_notice_ratelimited(
2514                                 "%s: server %s not responding, timed out\n",
2515                                 clnt->cl_program->name,
2516                                 task->tk_xprt->servername);
2517                 }
2518                 if (task->tk_flags & RPC_TASK_TIMEOUT)
2519                         rpc_call_rpcerror(task, -ETIMEDOUT);
2520                 else
2521                         __rpc_call_rpcerror(task, -EIO, -ETIMEDOUT);
2522                 return;
2523         }
2524
2525         if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
2526                 task->tk_flags |= RPC_CALL_MAJORSEEN;
2527                 if (clnt->cl_chatty) {
2528                         pr_notice_ratelimited(
2529                                 "%s: server %s not responding, still trying\n",
2530                                 clnt->cl_program->name,
2531                                 task->tk_xprt->servername);
2532                 }
2533         }
2534         rpc_force_rebind(clnt);
2535         /*
2536          * Did our request time out due to an RPCSEC_GSS out-of-sequence
2537          * event? RFC2203 requires the server to drop all such requests.
2538          */
2539         rpcauth_invalcred(task);
2540 }
2541
2542 /*
2543  * 7.   Decode the RPC reply
2544  */
2545 static void
2546 call_decode(struct rpc_task *task)
2547 {
2548         struct rpc_clnt *clnt = task->tk_client;
2549         struct rpc_rqst *req = task->tk_rqstp;
2550         struct xdr_stream xdr;
2551         int err;
2552
2553         if (!task->tk_msg.rpc_proc->p_decode) {
2554                 task->tk_action = rpc_exit_task;
2555                 return;
2556         }
2557
2558         if (task->tk_flags & RPC_CALL_MAJORSEEN) {
2559                 if (clnt->cl_chatty) {
2560                         pr_notice_ratelimited("%s: server %s OK\n",
2561                                 clnt->cl_program->name,
2562                                 task->tk_xprt->servername);
2563                 }
2564                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
2565         }
2566
2567         /*
2568          * Did we ever call xprt_complete_rqst()? If not, we should assume
2569          * the message is incomplete.
2570          */
2571         err = -EAGAIN;
2572         if (!req->rq_reply_bytes_recvd)
2573                 goto out;
2574
2575         /* Ensure that we see all writes made by xprt_complete_rqst()
2576          * before it changed req->rq_reply_bytes_recvd.
2577          */
2578         smp_rmb();
2579
2580         req->rq_rcv_buf.len = req->rq_private_buf.len;
2581         trace_rpc_xdr_recvfrom(task, &req->rq_rcv_buf);
2582
2583         /* Check that the softirq receive buffer is valid */
2584         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2585                                 sizeof(req->rq_rcv_buf)) != 0);
2586
2587         xdr_init_decode(&xdr, &req->rq_rcv_buf,
2588                         req->rq_rcv_buf.head[0].iov_base, req);
2589         err = rpc_decode_header(task, &xdr);
2590 out:
2591         switch (err) {
2592         case 0:
2593                 task->tk_action = rpc_exit_task;
2594                 task->tk_status = rpcauth_unwrap_resp(task, &xdr);
2595                 return;
2596         case -EAGAIN:
2597                 task->tk_status = 0;
2598                 if (task->tk_client->cl_discrtry)
2599                         xprt_conditional_disconnect(req->rq_xprt,
2600                                                     req->rq_connect_cookie);
2601                 task->tk_action = call_encode;
2602                 rpc_check_timeout(task);
2603                 break;
2604         case -EKEYREJECTED:
2605                 task->tk_action = call_reserve;
2606                 rpc_check_timeout(task);
2607                 rpcauth_invalcred(task);
2608                 /* Ensure we obtain a new XID if we retry! */
2609                 xprt_release(task);
2610         }
2611 }
2612
2613 static int
2614 rpc_encode_header(struct rpc_task *task, struct xdr_stream *xdr)
2615 {
2616         struct rpc_clnt *clnt = task->tk_client;
2617         struct rpc_rqst *req = task->tk_rqstp;
2618         __be32 *p;
2619         int error;
2620
2621         error = -EMSGSIZE;
2622         p = xdr_reserve_space(xdr, RPC_CALLHDRSIZE << 2);
2623         if (!p)
2624                 goto out_fail;
2625         *p++ = req->rq_xid;
2626         *p++ = rpc_call;
2627         *p++ = cpu_to_be32(RPC_VERSION);
2628         *p++ = cpu_to_be32(clnt->cl_prog);
2629         *p++ = cpu_to_be32(clnt->cl_vers);
2630         *p   = cpu_to_be32(task->tk_msg.rpc_proc->p_proc);
2631
2632         error = rpcauth_marshcred(task, xdr);
2633         if (error < 0)
2634                 goto out_fail;
2635         return 0;
2636 out_fail:
2637         trace_rpc_bad_callhdr(task);
2638         rpc_call_rpcerror(task, error);
2639         return error;
2640 }
2641
2642 static noinline int
2643 rpc_decode_header(struct rpc_task *task, struct xdr_stream *xdr)
2644 {
2645         struct rpc_clnt *clnt = task->tk_client;
2646         int error;
2647         __be32 *p;
2648
2649         /* RFC-1014 says that the representation of XDR data must be a
2650          * multiple of four bytes
2651          * - if it isn't pointer subtraction in the NFS client may give
2652          *   undefined results
2653          */
2654         if (task->tk_rqstp->rq_rcv_buf.len & 3)
2655                 goto out_unparsable;
2656
2657         p = xdr_inline_decode(xdr, 3 * sizeof(*p));
2658         if (!p)
2659                 goto out_unparsable;
2660         p++;    /* skip XID */
2661         if (*p++ != rpc_reply)
2662                 goto out_unparsable;
2663         if (*p++ != rpc_msg_accepted)
2664                 goto out_msg_denied;
2665
2666         error = rpcauth_checkverf(task, xdr);
2667         if (error)
2668                 goto out_verifier;
2669
2670         p = xdr_inline_decode(xdr, sizeof(*p));
2671         if (!p)
2672                 goto out_unparsable;
2673         switch (*p) {
2674         case rpc_success:
2675                 return 0;
2676         case rpc_prog_unavail:
2677                 trace_rpc__prog_unavail(task);
2678                 error = -EPFNOSUPPORT;
2679                 goto out_err;
2680         case rpc_prog_mismatch:
2681                 trace_rpc__prog_mismatch(task);
2682                 error = -EPROTONOSUPPORT;
2683                 goto out_err;
2684         case rpc_proc_unavail:
2685                 trace_rpc__proc_unavail(task);
2686                 error = -EOPNOTSUPP;
2687                 goto out_err;
2688         case rpc_garbage_args:
2689         case rpc_system_err:
2690                 trace_rpc__garbage_args(task);
2691                 error = -EIO;
2692                 break;
2693         default:
2694                 goto out_unparsable;
2695         }
2696
2697 out_garbage:
2698         clnt->cl_stats->rpcgarbage++;
2699         if (task->tk_garb_retry) {
2700                 task->tk_garb_retry--;
2701                 task->tk_action = call_encode;
2702                 return -EAGAIN;
2703         }
2704 out_err:
2705         rpc_call_rpcerror(task, error);
2706         return error;
2707
2708 out_unparsable:
2709         trace_rpc__unparsable(task);
2710         error = -EIO;
2711         goto out_garbage;
2712
2713 out_verifier:
2714         trace_rpc_bad_verifier(task);
2715         goto out_garbage;
2716
2717 out_msg_denied:
2718         error = -EACCES;
2719         p = xdr_inline_decode(xdr, sizeof(*p));
2720         if (!p)
2721                 goto out_unparsable;
2722         switch (*p++) {
2723         case rpc_auth_error:
2724                 break;
2725         case rpc_mismatch:
2726                 trace_rpc__mismatch(task);
2727                 error = -EPROTONOSUPPORT;
2728                 goto out_err;
2729         default:
2730                 goto out_unparsable;
2731         }
2732
2733         p = xdr_inline_decode(xdr, sizeof(*p));
2734         if (!p)
2735                 goto out_unparsable;
2736         switch (*p++) {
2737         case rpc_autherr_rejectedcred:
2738         case rpc_autherr_rejectedverf:
2739         case rpcsec_gsserr_credproblem:
2740         case rpcsec_gsserr_ctxproblem:
2741                 rpcauth_invalcred(task);
2742                 if (!task->tk_cred_retry)
2743                         break;
2744                 task->tk_cred_retry--;
2745                 trace_rpc__stale_creds(task);
2746                 return -EKEYREJECTED;
2747         case rpc_autherr_badcred:
2748         case rpc_autherr_badverf:
2749                 /* possibly garbled cred/verf? */
2750                 if (!task->tk_garb_retry)
2751                         break;
2752                 task->tk_garb_retry--;
2753                 trace_rpc__bad_creds(task);
2754                 task->tk_action = call_encode;
2755                 return -EAGAIN;
2756         case rpc_autherr_tooweak:
2757                 trace_rpc__auth_tooweak(task);
2758                 pr_warn("RPC: server %s requires stronger authentication.\n",
2759                         task->tk_xprt->servername);
2760                 break;
2761         default:
2762                 goto out_unparsable;
2763         }
2764         goto out_err;
2765 }
2766
2767 static void rpcproc_encode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2768                 const void *obj)
2769 {
2770 }
2771
2772 static int rpcproc_decode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2773                 void *obj)
2774 {
2775         return 0;
2776 }
2777
2778 static const struct rpc_procinfo rpcproc_null = {
2779         .p_encode = rpcproc_encode_null,
2780         .p_decode = rpcproc_decode_null,
2781 };
2782
2783 static const struct rpc_procinfo rpcproc_null_noreply = {
2784         .p_encode = rpcproc_encode_null,
2785 };
2786
2787 static void
2788 rpc_null_call_prepare(struct rpc_task *task, void *data)
2789 {
2790         task->tk_flags &= ~RPC_TASK_NO_RETRANS_TIMEOUT;
2791         rpc_call_start(task);
2792 }
2793
2794 static const struct rpc_call_ops rpc_null_ops = {
2795         .rpc_call_prepare = rpc_null_call_prepare,
2796         .rpc_call_done = rpc_default_callback,
2797 };
2798
2799 static
2800 struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt,
2801                 struct rpc_xprt *xprt, struct rpc_cred *cred, int flags,
2802                 const struct rpc_call_ops *ops, void *data)
2803 {
2804         struct rpc_message msg = {
2805                 .rpc_proc = &rpcproc_null,
2806         };
2807         struct rpc_task_setup task_setup_data = {
2808                 .rpc_client = clnt,
2809                 .rpc_xprt = xprt,
2810                 .rpc_message = &msg,
2811                 .rpc_op_cred = cred,
2812                 .callback_ops = ops ?: &rpc_null_ops,
2813                 .callback_data = data,
2814                 .flags = flags | RPC_TASK_SOFT | RPC_TASK_SOFTCONN |
2815                          RPC_TASK_NULLCREDS,
2816         };
2817
2818         return rpc_run_task(&task_setup_data);
2819 }
2820
2821 struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2822 {
2823         return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL);
2824 }
2825 EXPORT_SYMBOL_GPL(rpc_call_null);
2826
2827 static int rpc_ping(struct rpc_clnt *clnt)
2828 {
2829         struct rpc_task *task;
2830         int status;
2831
2832         task = rpc_call_null_helper(clnt, NULL, NULL, 0, NULL, NULL);
2833         if (IS_ERR(task))
2834                 return PTR_ERR(task);
2835         status = task->tk_status;
2836         rpc_put_task(task);
2837         return status;
2838 }
2839
2840 static int rpc_ping_noreply(struct rpc_clnt *clnt)
2841 {
2842         struct rpc_message msg = {
2843                 .rpc_proc = &rpcproc_null_noreply,
2844         };
2845         struct rpc_task_setup task_setup_data = {
2846                 .rpc_client = clnt,
2847                 .rpc_message = &msg,
2848                 .callback_ops = &rpc_null_ops,
2849                 .flags = RPC_TASK_SOFT | RPC_TASK_SOFTCONN | RPC_TASK_NULLCREDS,
2850         };
2851         struct rpc_task *task;
2852         int status;
2853
2854         task = rpc_run_task(&task_setup_data);
2855         if (IS_ERR(task))
2856                 return PTR_ERR(task);
2857         status = task->tk_status;
2858         rpc_put_task(task);
2859         return status;
2860 }
2861
2862 struct rpc_cb_add_xprt_calldata {
2863         struct rpc_xprt_switch *xps;
2864         struct rpc_xprt *xprt;
2865 };
2866
2867 static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata)
2868 {
2869         struct rpc_cb_add_xprt_calldata *data = calldata;
2870
2871         if (task->tk_status == 0)
2872                 rpc_xprt_switch_add_xprt(data->xps, data->xprt);
2873 }
2874
2875 static void rpc_cb_add_xprt_release(void *calldata)
2876 {
2877         struct rpc_cb_add_xprt_calldata *data = calldata;
2878
2879         xprt_put(data->xprt);
2880         xprt_switch_put(data->xps);
2881         kfree(data);
2882 }
2883
2884 static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
2885         .rpc_call_prepare = rpc_null_call_prepare,
2886         .rpc_call_done = rpc_cb_add_xprt_done,
2887         .rpc_release = rpc_cb_add_xprt_release,
2888 };
2889
2890 /**
2891  * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt
2892  * @clnt: pointer to struct rpc_clnt
2893  * @xps: pointer to struct rpc_xprt_switch,
2894  * @xprt: pointer struct rpc_xprt
2895  * @in_max_connect: pointer to the max_connect value for the passed in xprt transport
2896  */
2897 int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
2898                 struct rpc_xprt_switch *xps, struct rpc_xprt *xprt,
2899                 void *in_max_connect)
2900 {
2901         struct rpc_cb_add_xprt_calldata *data;
2902         struct rpc_task *task;
2903         int max_connect = clnt->cl_max_connect;
2904
2905         if (in_max_connect)
2906                 max_connect = *(int *)in_max_connect;
2907         if (xps->xps_nunique_destaddr_xprts + 1 > max_connect) {
2908                 rcu_read_lock();
2909                 pr_warn("SUNRPC: reached max allowed number (%d) did not add "
2910                         "transport to server: %s\n", max_connect,
2911                         rpc_peeraddr2str(clnt, RPC_DISPLAY_ADDR));
2912                 rcu_read_unlock();
2913                 return -EINVAL;
2914         }
2915
2916         data = kmalloc(sizeof(*data), GFP_KERNEL);
2917         if (!data)
2918                 return -ENOMEM;
2919         data->xps = xprt_switch_get(xps);
2920         data->xprt = xprt_get(xprt);
2921         if (rpc_xprt_switch_has_addr(data->xps, (struct sockaddr *)&xprt->addr)) {
2922                 rpc_cb_add_xprt_release(data);
2923                 goto success;
2924         }
2925
2926         task = rpc_call_null_helper(clnt, xprt, NULL, RPC_TASK_ASYNC,
2927                         &rpc_cb_add_xprt_call_ops, data);
2928         if (IS_ERR(task))
2929                 return PTR_ERR(task);
2930
2931         data->xps->xps_nunique_destaddr_xprts++;
2932         rpc_put_task(task);
2933 success:
2934         return 1;
2935 }
2936 EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
2937
2938 static int rpc_clnt_add_xprt_helper(struct rpc_clnt *clnt,
2939                                     struct rpc_xprt *xprt,
2940                                     struct rpc_add_xprt_test *data)
2941 {
2942         struct rpc_task *task;
2943         int status = -EADDRINUSE;
2944
2945         /* Test the connection */
2946         task = rpc_call_null_helper(clnt, xprt, NULL, 0, NULL, NULL);
2947         if (IS_ERR(task))
2948                 return PTR_ERR(task);
2949
2950         status = task->tk_status;
2951         rpc_put_task(task);
2952
2953         if (status < 0)
2954                 return status;
2955
2956         /* rpc_xprt_switch and rpc_xprt are deferrenced by add_xprt_test() */
2957         data->add_xprt_test(clnt, xprt, data->data);
2958
2959         return 0;
2960 }
2961
2962 /**
2963  * rpc_clnt_setup_test_and_add_xprt()
2964  *
2965  * This is an rpc_clnt_add_xprt setup() function which returns 1 so:
2966  *   1) caller of the test function must dereference the rpc_xprt_switch
2967  *   and the rpc_xprt.
2968  *   2) test function must call rpc_xprt_switch_add_xprt, usually in
2969  *   the rpc_call_done routine.
2970  *
2971  * Upon success (return of 1), the test function adds the new
2972  * transport to the rpc_clnt xprt switch
2973  *
2974  * @clnt: struct rpc_clnt to get the new transport
2975  * @xps:  the rpc_xprt_switch to hold the new transport
2976  * @xprt: the rpc_xprt to test
2977  * @data: a struct rpc_add_xprt_test pointer that holds the test function
2978  *        and test function call data
2979  */
2980 int rpc_clnt_setup_test_and_add_xprt(struct rpc_clnt *clnt,
2981                                      struct rpc_xprt_switch *xps,
2982                                      struct rpc_xprt *xprt,
2983                                      void *data)
2984 {
2985         int status = -EADDRINUSE;
2986
2987         xprt = xprt_get(xprt);
2988         xprt_switch_get(xps);
2989
2990         if (rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr))
2991                 goto out_err;
2992
2993         status = rpc_clnt_add_xprt_helper(clnt, xprt, data);
2994         if (status < 0)
2995                 goto out_err;
2996
2997         status = 1;
2998 out_err:
2999         xprt_put(xprt);
3000         xprt_switch_put(xps);
3001         if (status < 0)
3002                 pr_info("RPC:   rpc_clnt_test_xprt failed: %d addr %s not "
3003                         "added\n", status,
3004                         xprt->address_strings[RPC_DISPLAY_ADDR]);
3005         /* so that rpc_clnt_add_xprt does not call rpc_xprt_switch_add_xprt */
3006         return status;
3007 }
3008 EXPORT_SYMBOL_GPL(rpc_clnt_setup_test_and_add_xprt);
3009
3010 /**
3011  * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
3012  * @clnt: pointer to struct rpc_clnt
3013  * @xprtargs: pointer to struct xprt_create
3014  * @setup: callback to test and/or set up the connection
3015  * @data: pointer to setup function data
3016  *
3017  * Creates a new transport using the parameters set in args and
3018  * adds it to clnt.
3019  * If ping is set, then test that connectivity succeeds before
3020  * adding the new transport.
3021  *
3022  */
3023 int rpc_clnt_add_xprt(struct rpc_clnt *clnt,
3024                 struct xprt_create *xprtargs,
3025                 int (*setup)(struct rpc_clnt *,
3026                         struct rpc_xprt_switch *,
3027                         struct rpc_xprt *,
3028                         void *),
3029                 void *data)
3030 {
3031         struct rpc_xprt_switch *xps;
3032         struct rpc_xprt *xprt;
3033         unsigned long connect_timeout;
3034         unsigned long reconnect_timeout;
3035         unsigned char resvport, reuseport;
3036         int ret = 0, ident;
3037
3038         rcu_read_lock();
3039         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
3040         xprt = xprt_iter_xprt(&clnt->cl_xpi);
3041         if (xps == NULL || xprt == NULL) {
3042                 rcu_read_unlock();
3043                 xprt_switch_put(xps);
3044                 return -EAGAIN;
3045         }
3046         resvport = xprt->resvport;
3047         reuseport = xprt->reuseport;
3048         connect_timeout = xprt->connect_timeout;
3049         reconnect_timeout = xprt->max_reconnect_timeout;
3050         ident = xprt->xprt_class->ident;
3051         rcu_read_unlock();
3052
3053         if (!xprtargs->ident)
3054                 xprtargs->ident = ident;
3055         xprt = xprt_create_transport(xprtargs);
3056         if (IS_ERR(xprt)) {
3057                 ret = PTR_ERR(xprt);
3058                 goto out_put_switch;
3059         }
3060         xprt->resvport = resvport;
3061         xprt->reuseport = reuseport;
3062         if (xprt->ops->set_connect_timeout != NULL)
3063                 xprt->ops->set_connect_timeout(xprt,
3064                                 connect_timeout,
3065                                 reconnect_timeout);
3066
3067         rpc_xprt_switch_set_roundrobin(xps);
3068         if (setup) {
3069                 ret = setup(clnt, xps, xprt, data);
3070                 if (ret != 0)
3071                         goto out_put_xprt;
3072         }
3073         rpc_xprt_switch_add_xprt(xps, xprt);
3074 out_put_xprt:
3075         xprt_put(xprt);
3076 out_put_switch:
3077         xprt_switch_put(xps);
3078         return ret;
3079 }
3080 EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt);
3081
3082 static int rpc_xprt_probe_trunked(struct rpc_clnt *clnt,
3083                                   struct rpc_xprt *xprt,
3084                                   struct rpc_add_xprt_test *data)
3085 {
3086         struct rpc_xprt_switch *xps;
3087         struct rpc_xprt *main_xprt;
3088         int status = 0;
3089
3090         xprt_get(xprt);
3091
3092         rcu_read_lock();
3093         main_xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
3094         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
3095         status = rpc_cmp_addr_port((struct sockaddr *)&xprt->addr,
3096                                    (struct sockaddr *)&main_xprt->addr);
3097         rcu_read_unlock();
3098         xprt_put(main_xprt);
3099         if (status || !test_bit(XPRT_OFFLINE, &xprt->state))
3100                 goto out;
3101
3102         status = rpc_clnt_add_xprt_helper(clnt, xprt, data);
3103 out:
3104         xprt_put(xprt);
3105         xprt_switch_put(xps);
3106         return status;
3107 }
3108
3109 /* rpc_clnt_probe_trunked_xprt -- probe offlined transport for session trunking
3110  * @clnt rpc_clnt structure
3111  *
3112  * For each offlined transport found in the rpc_clnt structure call
3113  * the function rpc_xprt_probe_trunked() which will determine if this
3114  * transport still belongs to the trunking group.
3115  */
3116 void rpc_clnt_probe_trunked_xprts(struct rpc_clnt *clnt,
3117                                   struct rpc_add_xprt_test *data)
3118 {
3119         struct rpc_xprt_iter xpi;
3120         int ret;
3121
3122         ret = rpc_clnt_xprt_iter_offline_init(clnt, &xpi);
3123         if (ret)
3124                 return;
3125         for (;;) {
3126                 struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
3127
3128                 if (!xprt)
3129                         break;
3130                 ret = rpc_xprt_probe_trunked(clnt, xprt, data);
3131                 xprt_put(xprt);
3132                 if (ret < 0)
3133                         break;
3134                 xprt_iter_rewind(&xpi);
3135         }
3136         xprt_iter_destroy(&xpi);
3137 }
3138 EXPORT_SYMBOL_GPL(rpc_clnt_probe_trunked_xprts);
3139
3140 static int rpc_xprt_offline(struct rpc_clnt *clnt,
3141                             struct rpc_xprt *xprt,
3142                             void *data)
3143 {
3144         struct rpc_xprt *main_xprt;
3145         struct rpc_xprt_switch *xps;
3146         int err = 0;
3147
3148         xprt_get(xprt);
3149
3150         rcu_read_lock();
3151         main_xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
3152         xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
3153         err = rpc_cmp_addr_port((struct sockaddr *)&xprt->addr,
3154                                 (struct sockaddr *)&main_xprt->addr);
3155         rcu_read_unlock();
3156         xprt_put(main_xprt);
3157         if (err)
3158                 goto out;
3159
3160         if (wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_KILLABLE)) {
3161                 err = -EINTR;
3162                 goto out;
3163         }
3164         xprt_set_offline_locked(xprt, xps);
3165
3166         xprt_release_write(xprt, NULL);
3167 out:
3168         xprt_put(xprt);
3169         xprt_switch_put(xps);
3170         return err;
3171 }
3172
3173 /* rpc_clnt_manage_trunked_xprts -- offline trunked transports
3174  * @clnt rpc_clnt structure
3175  *
3176  * For each active transport found in the rpc_clnt structure call
3177  * the function rpc_xprt_offline() which will identify trunked transports
3178  * and will mark them offline.
3179  */
3180 void rpc_clnt_manage_trunked_xprts(struct rpc_clnt *clnt)
3181 {
3182         rpc_clnt_iterate_for_each_xprt(clnt, rpc_xprt_offline, NULL);
3183 }
3184 EXPORT_SYMBOL_GPL(rpc_clnt_manage_trunked_xprts);
3185
3186 struct connect_timeout_data {
3187         unsigned long connect_timeout;
3188         unsigned long reconnect_timeout;
3189 };
3190
3191 static int
3192 rpc_xprt_set_connect_timeout(struct rpc_clnt *clnt,
3193                 struct rpc_xprt *xprt,
3194                 void *data)
3195 {
3196         struct connect_timeout_data *timeo = data;
3197
3198         if (xprt->ops->set_connect_timeout)
3199                 xprt->ops->set_connect_timeout(xprt,
3200                                 timeo->connect_timeout,
3201                                 timeo->reconnect_timeout);
3202         return 0;
3203 }
3204
3205 void
3206 rpc_set_connect_timeout(struct rpc_clnt *clnt,
3207                 unsigned long connect_timeout,
3208                 unsigned long reconnect_timeout)
3209 {
3210         struct connect_timeout_data timeout = {
3211                 .connect_timeout = connect_timeout,
3212                 .reconnect_timeout = reconnect_timeout,
3213         };
3214         rpc_clnt_iterate_for_each_xprt(clnt,
3215                         rpc_xprt_set_connect_timeout,
3216                         &timeout);
3217 }
3218 EXPORT_SYMBOL_GPL(rpc_set_connect_timeout);
3219
3220 void rpc_clnt_xprt_switch_put(struct rpc_clnt *clnt)
3221 {
3222         rcu_read_lock();
3223         xprt_switch_put(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
3224         rcu_read_unlock();
3225 }
3226 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_put);
3227
3228 void rpc_clnt_xprt_set_online(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
3229 {
3230         struct rpc_xprt_switch *xps;
3231
3232         rcu_read_lock();
3233         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
3234         rcu_read_unlock();
3235         xprt_set_online_locked(xprt, xps);
3236 }
3237
3238 void rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
3239 {
3240         if (rpc_clnt_xprt_switch_has_addr(clnt,
3241                 (const struct sockaddr *)&xprt->addr)) {
3242                 return rpc_clnt_xprt_set_online(clnt, xprt);
3243         }
3244         rcu_read_lock();
3245         rpc_xprt_switch_add_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
3246                                  xprt);
3247         rcu_read_unlock();
3248 }
3249 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_add_xprt);
3250
3251 void rpc_clnt_xprt_switch_remove_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
3252 {
3253         struct rpc_xprt_switch *xps;
3254
3255         rcu_read_lock();
3256         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
3257         rpc_xprt_switch_remove_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
3258                                     xprt, 0);
3259         xps->xps_nunique_destaddr_xprts--;
3260         rcu_read_unlock();
3261 }
3262 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_remove_xprt);
3263
3264 bool rpc_clnt_xprt_switch_has_addr(struct rpc_clnt *clnt,
3265                                    const struct sockaddr *sap)
3266 {
3267         struct rpc_xprt_switch *xps;
3268         bool ret;
3269
3270         rcu_read_lock();
3271         xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
3272         ret = rpc_xprt_switch_has_addr(xps, sap);
3273         rcu_read_unlock();
3274         return ret;
3275 }
3276 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_has_addr);
3277
3278 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
3279 static void rpc_show_header(void)
3280 {
3281         printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
3282                 "-timeout ---ops--\n");
3283 }
3284
3285 static void rpc_show_task(const struct rpc_clnt *clnt,
3286                           const struct rpc_task *task)
3287 {
3288         const char *rpc_waitq = "none";
3289
3290         if (RPC_IS_QUEUED(task))
3291                 rpc_waitq = rpc_qname(task->tk_waitqueue);
3292
3293         printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
3294                 task->tk_pid, task->tk_flags, task->tk_status,
3295                 clnt, task->tk_rqstp, rpc_task_timeout(task), task->tk_ops,
3296                 clnt->cl_program->name, clnt->cl_vers, rpc_proc_name(task),
3297                 task->tk_action, rpc_waitq);
3298 }
3299
3300 void rpc_show_tasks(struct net *net)
3301 {
3302         struct rpc_clnt *clnt;
3303         struct rpc_task *task;
3304         int header = 0;
3305         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
3306
3307         spin_lock(&sn->rpc_client_lock);
3308         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
3309                 spin_lock(&clnt->cl_lock);
3310                 list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
3311                         if (!header) {
3312                                 rpc_show_header();
3313                                 header++;
3314                         }
3315                         rpc_show_task(clnt, task);
3316                 }
3317                 spin_unlock(&clnt->cl_lock);
3318         }
3319         spin_unlock(&sn->rpc_client_lock);
3320 }
3321 #endif
3322
3323 #if IS_ENABLED(CONFIG_SUNRPC_SWAP)
3324 static int
3325 rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt,
3326                 struct rpc_xprt *xprt,
3327                 void *dummy)
3328 {
3329         return xprt_enable_swap(xprt);
3330 }
3331
3332 int
3333 rpc_clnt_swap_activate(struct rpc_clnt *clnt)
3334 {
3335         while (clnt != clnt->cl_parent)
3336                 clnt = clnt->cl_parent;
3337         if (atomic_inc_return(&clnt->cl_swapper) == 1)
3338                 return rpc_clnt_iterate_for_each_xprt(clnt,
3339                                 rpc_clnt_swap_activate_callback, NULL);
3340         return 0;
3341 }
3342 EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
3343
3344 static int
3345 rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt,
3346                 struct rpc_xprt *xprt,
3347                 void *dummy)
3348 {
3349         xprt_disable_swap(xprt);
3350         return 0;
3351 }
3352
3353 void
3354 rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
3355 {
3356         while (clnt != clnt->cl_parent)
3357                 clnt = clnt->cl_parent;
3358         if (atomic_dec_if_positive(&clnt->cl_swapper) == 0)
3359                 rpc_clnt_iterate_for_each_xprt(clnt,
3360                                 rpc_clnt_swap_deactivate_callback, NULL);
3361 }
3362 EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
3363 #endif /* CONFIG_SUNRPC_SWAP */