SUNRPC: skip dead but not buried clients on PipeFS events
[linux-flexiantxendom0-3.2.10.git] / net / sunrpc / clnt.c
1 /*
2  *  linux/net/sunrpc/clnt.c
3  *
4  *  This file contains the high-level RPC interface.
5  *  It is modeled as a finite state machine to support both synchronous
6  *  and asynchronous requests.
7  *
8  *  -   RPC header generation and argument serialization.
9  *  -   Credential refresh.
10  *  -   TCP connect handling.
11  *  -   Retry of operation when it is suspected the operation failed because
12  *      of uid squashing on the server, or when the credentials were stale
13  *      and need to be refreshed, or when a packet was damaged in transit.
14  *      This may be have to be moved to the VFS layer.
15  *
16  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
17  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
18  */
19
20
21 #include <linux/module.h>
22 #include <linux/types.h>
23 #include <linux/kallsyms.h>
24 #include <linux/mm.h>
25 #include <linux/namei.h>
26 #include <linux/mount.h>
27 #include <linux/slab.h>
28 #include <linux/utsname.h>
29 #include <linux/workqueue.h>
30 #include <linux/in.h>
31 #include <linux/in6.h>
32 #include <linux/un.h>
33 #include <linux/rcupdate.h>
34
35 #include <linux/sunrpc/clnt.h>
36 #include <linux/sunrpc/rpc_pipe_fs.h>
37 #include <linux/sunrpc/metrics.h>
38 #include <linux/sunrpc/bc_xprt.h>
39 #include <trace/events/sunrpc.h>
40
41 #include "sunrpc.h"
42 #include "netns.h"
43
44 #ifdef RPC_DEBUG
45 # define RPCDBG_FACILITY        RPCDBG_CALL
46 #endif
47
48 #define dprint_status(t)                                        \
49         dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,         \
50                         __func__, t->tk_status)
51
52 /*
53  * All RPC clients are linked into this list
54  */
55
56 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
57
58
59 static void     call_start(struct rpc_task *task);
60 static void     call_reserve(struct rpc_task *task);
61 static void     call_reserveresult(struct rpc_task *task);
62 static void     call_allocate(struct rpc_task *task);
63 static void     call_decode(struct rpc_task *task);
64 static void     call_bind(struct rpc_task *task);
65 static void     call_bind_status(struct rpc_task *task);
66 static void     call_transmit(struct rpc_task *task);
67 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
68 static void     call_bc_transmit(struct rpc_task *task);
69 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
70 static void     call_status(struct rpc_task *task);
71 static void     call_transmit_status(struct rpc_task *task);
72 static void     call_refresh(struct rpc_task *task);
73 static void     call_refreshresult(struct rpc_task *task);
74 static void     call_timeout(struct rpc_task *task);
75 static void     call_connect(struct rpc_task *task);
76 static void     call_connect_status(struct rpc_task *task);
77
78 static __be32   *rpc_encode_header(struct rpc_task *task);
79 static __be32   *rpc_verify_header(struct rpc_task *task);
80 static int      rpc_ping(struct rpc_clnt *clnt);
81
82 static void rpc_register_client(struct rpc_clnt *clnt)
83 {
84         struct net *net = rpc_net_ns(clnt);
85         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
86
87         spin_lock(&sn->rpc_client_lock);
88         list_add(&clnt->cl_clients, &sn->all_clients);
89         spin_unlock(&sn->rpc_client_lock);
90 }
91
92 static void rpc_unregister_client(struct rpc_clnt *clnt)
93 {
94         struct net *net = rpc_net_ns(clnt);
95         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
96
97         spin_lock(&sn->rpc_client_lock);
98         list_del(&clnt->cl_clients);
99         spin_unlock(&sn->rpc_client_lock);
100 }
101
102 static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
103 {
104         if (clnt->cl_dentry) {
105                 if (clnt->cl_auth && clnt->cl_auth->au_ops->pipes_destroy)
106                         clnt->cl_auth->au_ops->pipes_destroy(clnt->cl_auth);
107                 rpc_remove_client_dir(clnt->cl_dentry);
108         }
109         clnt->cl_dentry = NULL;
110 }
111
112 static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
113 {
114         struct net *net = rpc_net_ns(clnt);
115         struct super_block *pipefs_sb;
116
117         pipefs_sb = rpc_get_sb_net(net);
118         if (pipefs_sb) {
119                 __rpc_clnt_remove_pipedir(clnt);
120                 rpc_put_sb_net(net);
121         }
122 }
123
124 static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
125                                     struct rpc_clnt *clnt,
126                                     const char *dir_name)
127 {
128         static uint32_t clntid;
129         char name[15];
130         struct qstr q = {
131                 .name = name,
132         };
133         struct dentry *dir, *dentry;
134         int error;
135
136         dir = rpc_d_lookup_sb(sb, dir_name);
137         if (dir == NULL)
138                 return dir;
139         for (;;) {
140                 q.len = snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
141                 name[sizeof(name) - 1] = '\0';
142                 q.hash = full_name_hash(q.name, q.len);
143                 dentry = rpc_create_client_dir(dir, &q, clnt);
144                 if (!IS_ERR(dentry))
145                         break;
146                 error = PTR_ERR(dentry);
147                 if (error != -EEXIST) {
148                         printk(KERN_INFO "RPC: Couldn't create pipefs entry"
149                                         " %s/%s, error %d\n",
150                                         dir_name, name, error);
151                         break;
152                 }
153         }
154         dput(dir);
155         return dentry;
156 }
157
158 static int
159 rpc_setup_pipedir(struct rpc_clnt *clnt, const char *dir_name)
160 {
161         struct net *net = rpc_net_ns(clnt);
162         struct super_block *pipefs_sb;
163         struct dentry *dentry;
164
165         clnt->cl_dentry = NULL;
166         if (dir_name == NULL)
167                 return 0;
168         pipefs_sb = rpc_get_sb_net(net);
169         if (!pipefs_sb)
170                 return 0;
171         dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt, dir_name);
172         rpc_put_sb_net(net);
173         if (IS_ERR(dentry))
174                 return PTR_ERR(dentry);
175         clnt->cl_dentry = dentry;
176         return 0;
177 }
178
179 static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
180                                 struct super_block *sb)
181 {
182         struct dentry *dentry;
183         int err = 0;
184
185         switch (event) {
186         case RPC_PIPEFS_MOUNT:
187                 if (clnt->cl_program->pipe_dir_name == NULL)
188                         break;
189                 dentry = rpc_setup_pipedir_sb(sb, clnt,
190                                               clnt->cl_program->pipe_dir_name);
191                 BUG_ON(dentry == NULL);
192                 if (IS_ERR(dentry))
193                         return PTR_ERR(dentry);
194                 clnt->cl_dentry = dentry;
195                 if (clnt->cl_auth->au_ops->pipes_create) {
196                         err = clnt->cl_auth->au_ops->pipes_create(clnt->cl_auth);
197                         if (err)
198                                 __rpc_clnt_remove_pipedir(clnt);
199                 }
200                 break;
201         case RPC_PIPEFS_UMOUNT:
202                 __rpc_clnt_remove_pipedir(clnt);
203                 break;
204         default:
205                 printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
206                 return -ENOTSUPP;
207         }
208         return err;
209 }
210
211 static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
212 {
213         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
214         struct rpc_clnt *clnt;
215
216         spin_lock(&sn->rpc_client_lock);
217         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
218                 if (((event == RPC_PIPEFS_MOUNT) && clnt->cl_dentry) ||
219                     ((event == RPC_PIPEFS_UMOUNT) && !clnt->cl_dentry))
220                         continue;
221                 if (atomic_inc_not_zero(&clnt->cl_count) == 0)
222                         continue;
223                 spin_unlock(&sn->rpc_client_lock);
224                 return clnt;
225         }
226         spin_unlock(&sn->rpc_client_lock);
227         return NULL;
228 }
229
230 static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
231                             void *ptr)
232 {
233         struct super_block *sb = ptr;
234         struct rpc_clnt *clnt;
235         int error = 0;
236
237         while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
238                 error = __rpc_pipefs_event(clnt, event, sb);
239                 rpc_release_client(clnt);
240                 if (error)
241                         break;
242         }
243         return error;
244 }
245
246 static struct notifier_block rpc_clients_block = {
247         .notifier_call  = rpc_pipefs_event,
248         .priority       = SUNRPC_PIPEFS_RPC_PRIO,
249 };
250
251 int rpc_clients_notifier_register(void)
252 {
253         return rpc_pipefs_notifier_register(&rpc_clients_block);
254 }
255
256 void rpc_clients_notifier_unregister(void)
257 {
258         return rpc_pipefs_notifier_unregister(&rpc_clients_block);
259 }
260
261 static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, struct rpc_xprt *xprt)
262 {
263         const struct rpc_program *program = args->program;
264         const struct rpc_version *version;
265         struct rpc_clnt         *clnt = NULL;
266         struct rpc_auth         *auth;
267         int err;
268
269         /* sanity check the name before trying to print it */
270         dprintk("RPC:       creating %s client for %s (xprt %p)\n",
271                         program->name, args->servername, xprt);
272
273         err = rpciod_up();
274         if (err)
275                 goto out_no_rpciod;
276         err = -EINVAL;
277         if (!xprt)
278                 goto out_no_xprt;
279
280         if (args->version >= program->nrvers)
281                 goto out_err;
282         version = program->version[args->version];
283         if (version == NULL)
284                 goto out_err;
285
286         err = -ENOMEM;
287         clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
288         if (!clnt)
289                 goto out_err;
290         clnt->cl_parent = clnt;
291
292         rcu_assign_pointer(clnt->cl_xprt, xprt);
293         clnt->cl_procinfo = version->procs;
294         clnt->cl_maxproc  = version->nrprocs;
295         clnt->cl_protname = program->name;
296         clnt->cl_prog     = args->prognumber ? : program->number;
297         clnt->cl_vers     = version->number;
298         clnt->cl_stats    = program->stats;
299         clnt->cl_metrics  = rpc_alloc_iostats(clnt);
300         err = -ENOMEM;
301         if (clnt->cl_metrics == NULL)
302                 goto out_no_stats;
303         clnt->cl_program  = program;
304         INIT_LIST_HEAD(&clnt->cl_tasks);
305         spin_lock_init(&clnt->cl_lock);
306
307         if (!xprt_bound(xprt))
308                 clnt->cl_autobind = 1;
309
310         clnt->cl_timeout = xprt->timeout;
311         if (args->timeout != NULL) {
312                 memcpy(&clnt->cl_timeout_default, args->timeout,
313                                 sizeof(clnt->cl_timeout_default));
314                 clnt->cl_timeout = &clnt->cl_timeout_default;
315         }
316
317         clnt->cl_rtt = &clnt->cl_rtt_default;
318         rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
319         clnt->cl_principal = NULL;
320         if (args->client_name) {
321                 clnt->cl_principal = kstrdup(args->client_name, GFP_KERNEL);
322                 if (!clnt->cl_principal)
323                         goto out_no_principal;
324         }
325
326         atomic_set(&clnt->cl_count, 1);
327
328         err = rpc_setup_pipedir(clnt, program->pipe_dir_name);
329         if (err < 0)
330                 goto out_no_path;
331
332         auth = rpcauth_create(args->authflavor, clnt);
333         if (IS_ERR(auth)) {
334                 printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n",
335                                 args->authflavor);
336                 err = PTR_ERR(auth);
337                 goto out_no_auth;
338         }
339
340         /* save the nodename */
341         clnt->cl_nodelen = strlen(init_utsname()->nodename);
342         if (clnt->cl_nodelen > UNX_MAXNODENAME)
343                 clnt->cl_nodelen = UNX_MAXNODENAME;
344         memcpy(clnt->cl_nodename, init_utsname()->nodename, clnt->cl_nodelen);
345         rpc_register_client(clnt);
346         return clnt;
347
348 out_no_auth:
349         rpc_clnt_remove_pipedir(clnt);
350 out_no_path:
351         kfree(clnt->cl_principal);
352 out_no_principal:
353         rpc_free_iostats(clnt->cl_metrics);
354 out_no_stats:
355         kfree(clnt);
356 out_err:
357         xprt_put(xprt);
358 out_no_xprt:
359         rpciod_down();
360 out_no_rpciod:
361         return ERR_PTR(err);
362 }
363
364 /*
365  * rpc_create - create an RPC client and transport with one call
366  * @args: rpc_clnt create argument structure
367  *
368  * Creates and initializes an RPC transport and an RPC client.
369  *
370  * It can ping the server in order to determine if it is up, and to see if
371  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
372  * this behavior so asynchronous tasks can also use rpc_create.
373  */
374 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
375 {
376         struct rpc_xprt *xprt;
377         struct rpc_clnt *clnt;
378         struct xprt_create xprtargs = {
379                 .net = args->net,
380                 .ident = args->protocol,
381                 .srcaddr = args->saddress,
382                 .dstaddr = args->address,
383                 .addrlen = args->addrsize,
384                 .servername = args->servername,
385                 .bc_xprt = args->bc_xprt,
386         };
387         char servername[48];
388
389         /*
390          * If the caller chooses not to specify a hostname, whip
391          * up a string representation of the passed-in address.
392          */
393         if (xprtargs.servername == NULL) {
394                 struct sockaddr_un *sun =
395                                 (struct sockaddr_un *)args->address;
396                 struct sockaddr_in *sin =
397                                 (struct sockaddr_in *)args->address;
398                 struct sockaddr_in6 *sin6 =
399                                 (struct sockaddr_in6 *)args->address;
400
401                 servername[0] = '\0';
402                 switch (args->address->sa_family) {
403                 case AF_LOCAL:
404                         snprintf(servername, sizeof(servername), "%s",
405                                  sun->sun_path);
406                         break;
407                 case AF_INET:
408                         snprintf(servername, sizeof(servername), "%pI4",
409                                  &sin->sin_addr.s_addr);
410                         break;
411                 case AF_INET6:
412                         snprintf(servername, sizeof(servername), "%pI6",
413                                  &sin6->sin6_addr);
414                         break;
415                 default:
416                         /* caller wants default server name, but
417                          * address family isn't recognized. */
418                         return ERR_PTR(-EINVAL);
419                 }
420                 xprtargs.servername = servername;
421         }
422
423         xprt = xprt_create_transport(&xprtargs);
424         if (IS_ERR(xprt))
425                 return (struct rpc_clnt *)xprt;
426
427         /*
428          * By default, kernel RPC client connects from a reserved port.
429          * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
430          * but it is always enabled for rpciod, which handles the connect
431          * operation.
432          */
433         xprt->resvport = 1;
434         if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
435                 xprt->resvport = 0;
436
437         clnt = rpc_new_client(args, xprt);
438         if (IS_ERR(clnt))
439                 return clnt;
440
441         if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
442                 int err = rpc_ping(clnt);
443                 if (err != 0) {
444                         rpc_shutdown_client(clnt);
445                         return ERR_PTR(err);
446                 }
447         }
448
449         clnt->cl_softrtry = 1;
450         if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
451                 clnt->cl_softrtry = 0;
452
453         if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
454                 clnt->cl_autobind = 1;
455         if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
456                 clnt->cl_discrtry = 1;
457         if (!(args->flags & RPC_CLNT_CREATE_QUIET))
458                 clnt->cl_chatty = 1;
459
460         return clnt;
461 }
462 EXPORT_SYMBOL_GPL(rpc_create);
463
464 /*
465  * This function clones the RPC client structure. It allows us to share the
466  * same transport while varying parameters such as the authentication
467  * flavour.
468  */
469 struct rpc_clnt *
470 rpc_clone_client(struct rpc_clnt *clnt)
471 {
472         struct rpc_clnt *new;
473         struct rpc_xprt *xprt;
474         int err = -ENOMEM;
475
476         new = kmemdup(clnt, sizeof(*new), GFP_KERNEL);
477         if (!new)
478                 goto out_no_clnt;
479         new->cl_parent = clnt;
480         /* Turn off autobind on clones */
481         new->cl_autobind = 0;
482         INIT_LIST_HEAD(&new->cl_tasks);
483         spin_lock_init(&new->cl_lock);
484         rpc_init_rtt(&new->cl_rtt_default, clnt->cl_timeout->to_initval);
485         new->cl_metrics = rpc_alloc_iostats(clnt);
486         if (new->cl_metrics == NULL)
487                 goto out_no_stats;
488         if (clnt->cl_principal) {
489                 new->cl_principal = kstrdup(clnt->cl_principal, GFP_KERNEL);
490                 if (new->cl_principal == NULL)
491                         goto out_no_principal;
492         }
493         rcu_read_lock();
494         xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
495         rcu_read_unlock();
496         if (xprt == NULL)
497                 goto out_no_transport;
498         rcu_assign_pointer(new->cl_xprt, xprt);
499         atomic_set(&new->cl_count, 1);
500         err = rpc_setup_pipedir(new, clnt->cl_program->pipe_dir_name);
501         if (err != 0)
502                 goto out_no_path;
503         if (new->cl_auth)
504                 atomic_inc(&new->cl_auth->au_count);
505         atomic_inc(&clnt->cl_count);
506         rpc_register_client(new);
507         rpciod_up();
508         return new;
509 out_no_path:
510         xprt_put(xprt);
511 out_no_transport:
512         kfree(new->cl_principal);
513 out_no_principal:
514         rpc_free_iostats(new->cl_metrics);
515 out_no_stats:
516         kfree(new);
517 out_no_clnt:
518         dprintk("RPC:       %s: returned error %d\n", __func__, err);
519         return ERR_PTR(err);
520 }
521 EXPORT_SYMBOL_GPL(rpc_clone_client);
522
523 /*
524  * Kill all tasks for the given client.
525  * XXX: kill their descendants as well?
526  */
527 void rpc_killall_tasks(struct rpc_clnt *clnt)
528 {
529         struct rpc_task *rovr;
530
531
532         if (list_empty(&clnt->cl_tasks))
533                 return;
534         dprintk("RPC:       killing all tasks for client %p\n", clnt);
535         /*
536          * Spin lock all_tasks to prevent changes...
537          */
538         spin_lock(&clnt->cl_lock);
539         list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
540                 if (!RPC_IS_ACTIVATED(rovr))
541                         continue;
542                 if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
543                         rovr->tk_flags |= RPC_TASK_KILLED;
544                         rpc_exit(rovr, -EIO);
545                         if (RPC_IS_QUEUED(rovr))
546                                 rpc_wake_up_queued_task(rovr->tk_waitqueue,
547                                                         rovr);
548                 }
549         }
550         spin_unlock(&clnt->cl_lock);
551 }
552 EXPORT_SYMBOL_GPL(rpc_killall_tasks);
553
554 /*
555  * Properly shut down an RPC client, terminating all outstanding
556  * requests.
557  */
558 void rpc_shutdown_client(struct rpc_clnt *clnt)
559 {
560         dprintk_rcu("RPC:       shutting down %s client for %s\n",
561                         clnt->cl_protname,
562                         rcu_dereference(clnt->cl_xprt)->servername);
563
564         while (!list_empty(&clnt->cl_tasks)) {
565                 rpc_killall_tasks(clnt);
566                 wait_event_timeout(destroy_wait,
567                         list_empty(&clnt->cl_tasks), 1*HZ);
568         }
569
570         rpc_release_client(clnt);
571 }
572 EXPORT_SYMBOL_GPL(rpc_shutdown_client);
573
574 /*
575  * Free an RPC client
576  */
577 static void
578 rpc_free_client(struct rpc_clnt *clnt)
579 {
580         dprintk_rcu("RPC:       destroying %s client for %s\n",
581                         clnt->cl_protname,
582                         rcu_dereference(clnt->cl_xprt)->servername);
583         if (clnt->cl_parent != clnt)
584                 rpc_release_client(clnt->cl_parent);
585         rpc_unregister_client(clnt);
586         rpc_clnt_remove_pipedir(clnt);
587         rpc_free_iostats(clnt->cl_metrics);
588         kfree(clnt->cl_principal);
589         clnt->cl_metrics = NULL;
590         xprt_put(rcu_dereference_raw(clnt->cl_xprt));
591         rpciod_down();
592         kfree(clnt);
593 }
594
595 /*
596  * Free an RPC client
597  */
598 static void
599 rpc_free_auth(struct rpc_clnt *clnt)
600 {
601         if (clnt->cl_auth == NULL) {
602                 rpc_free_client(clnt);
603                 return;
604         }
605
606         /*
607          * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
608          *       release remaining GSS contexts. This mechanism ensures
609          *       that it can do so safely.
610          */
611         atomic_inc(&clnt->cl_count);
612         rpcauth_release(clnt->cl_auth);
613         clnt->cl_auth = NULL;
614         if (atomic_dec_and_test(&clnt->cl_count))
615                 rpc_free_client(clnt);
616 }
617
618 /*
619  * Release reference to the RPC client
620  */
621 void
622 rpc_release_client(struct rpc_clnt *clnt)
623 {
624         dprintk("RPC:       rpc_release_client(%p)\n", clnt);
625
626         if (list_empty(&clnt->cl_tasks))
627                 wake_up(&destroy_wait);
628         if (atomic_dec_and_test(&clnt->cl_count))
629                 rpc_free_auth(clnt);
630 }
631
632 /**
633  * rpc_bind_new_program - bind a new RPC program to an existing client
634  * @old: old rpc_client
635  * @program: rpc program to set
636  * @vers: rpc program version
637  *
638  * Clones the rpc client and sets up a new RPC program. This is mainly
639  * of use for enabling different RPC programs to share the same transport.
640  * The Sun NFSv2/v3 ACL protocol can do this.
641  */
642 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
643                                       const struct rpc_program *program,
644                                       u32 vers)
645 {
646         struct rpc_clnt *clnt;
647         const struct rpc_version *version;
648         int err;
649
650         BUG_ON(vers >= program->nrvers || !program->version[vers]);
651         version = program->version[vers];
652         clnt = rpc_clone_client(old);
653         if (IS_ERR(clnt))
654                 goto out;
655         clnt->cl_procinfo = version->procs;
656         clnt->cl_maxproc  = version->nrprocs;
657         clnt->cl_protname = program->name;
658         clnt->cl_prog     = program->number;
659         clnt->cl_vers     = version->number;
660         clnt->cl_stats    = program->stats;
661         err = rpc_ping(clnt);
662         if (err != 0) {
663                 rpc_shutdown_client(clnt);
664                 clnt = ERR_PTR(err);
665         }
666 out:
667         return clnt;
668 }
669 EXPORT_SYMBOL_GPL(rpc_bind_new_program);
670
671 void rpc_task_release_client(struct rpc_task *task)
672 {
673         struct rpc_clnt *clnt = task->tk_client;
674
675         if (clnt != NULL) {
676                 /* Remove from client task list */
677                 spin_lock(&clnt->cl_lock);
678                 list_del(&task->tk_task);
679                 spin_unlock(&clnt->cl_lock);
680                 task->tk_client = NULL;
681
682                 rpc_release_client(clnt);
683         }
684 }
685
686 static
687 void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
688 {
689         if (clnt != NULL) {
690                 rpc_task_release_client(task);
691                 task->tk_client = clnt;
692                 atomic_inc(&clnt->cl_count);
693                 if (clnt->cl_softrtry)
694                         task->tk_flags |= RPC_TASK_SOFT;
695                 /* Add to the client's list of all tasks */
696                 spin_lock(&clnt->cl_lock);
697                 list_add_tail(&task->tk_task, &clnt->cl_tasks);
698                 spin_unlock(&clnt->cl_lock);
699         }
700 }
701
702 void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt)
703 {
704         rpc_task_release_client(task);
705         rpc_task_set_client(task, clnt);
706 }
707 EXPORT_SYMBOL_GPL(rpc_task_reset_client);
708
709
710 static void
711 rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
712 {
713         if (msg != NULL) {
714                 task->tk_msg.rpc_proc = msg->rpc_proc;
715                 task->tk_msg.rpc_argp = msg->rpc_argp;
716                 task->tk_msg.rpc_resp = msg->rpc_resp;
717                 if (msg->rpc_cred != NULL)
718                         task->tk_msg.rpc_cred = get_rpccred(msg->rpc_cred);
719         }
720 }
721
722 /*
723  * Default callback for async RPC calls
724  */
725 static void
726 rpc_default_callback(struct rpc_task *task, void *data)
727 {
728 }
729
730 static const struct rpc_call_ops rpc_default_ops = {
731         .rpc_call_done = rpc_default_callback,
732 };
733
734 /**
735  * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
736  * @task_setup_data: pointer to task initialisation data
737  */
738 struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
739 {
740         struct rpc_task *task;
741
742         task = rpc_new_task(task_setup_data);
743         if (IS_ERR(task))
744                 goto out;
745
746         rpc_task_set_client(task, task_setup_data->rpc_client);
747         rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
748
749         if (task->tk_action == NULL)
750                 rpc_call_start(task);
751
752         atomic_inc(&task->tk_count);
753         rpc_execute(task);
754 out:
755         return task;
756 }
757 EXPORT_SYMBOL_GPL(rpc_run_task);
758
759 /**
760  * rpc_call_sync - Perform a synchronous RPC call
761  * @clnt: pointer to RPC client
762  * @msg: RPC call parameters
763  * @flags: RPC call flags
764  */
765 int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
766 {
767         struct rpc_task *task;
768         struct rpc_task_setup task_setup_data = {
769                 .rpc_client = clnt,
770                 .rpc_message = msg,
771                 .callback_ops = &rpc_default_ops,
772                 .flags = flags,
773         };
774         int status;
775
776         BUG_ON(flags & RPC_TASK_ASYNC);
777
778         task = rpc_run_task(&task_setup_data);
779         if (IS_ERR(task))
780                 return PTR_ERR(task);
781         status = task->tk_status;
782         rpc_put_task(task);
783         return status;
784 }
785 EXPORT_SYMBOL_GPL(rpc_call_sync);
786
787 /**
788  * rpc_call_async - Perform an asynchronous RPC call
789  * @clnt: pointer to RPC client
790  * @msg: RPC call parameters
791  * @flags: RPC call flags
792  * @tk_ops: RPC call ops
793  * @data: user call data
794  */
795 int
796 rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
797                const struct rpc_call_ops *tk_ops, void *data)
798 {
799         struct rpc_task *task;
800         struct rpc_task_setup task_setup_data = {
801                 .rpc_client = clnt,
802                 .rpc_message = msg,
803                 .callback_ops = tk_ops,
804                 .callback_data = data,
805                 .flags = flags|RPC_TASK_ASYNC,
806         };
807
808         task = rpc_run_task(&task_setup_data);
809         if (IS_ERR(task))
810                 return PTR_ERR(task);
811         rpc_put_task(task);
812         return 0;
813 }
814 EXPORT_SYMBOL_GPL(rpc_call_async);
815
816 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
817 /**
818  * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
819  * rpc_execute against it
820  * @req: RPC request
821  * @tk_ops: RPC call ops
822  */
823 struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
824                                 const struct rpc_call_ops *tk_ops)
825 {
826         struct rpc_task *task;
827         struct xdr_buf *xbufp = &req->rq_snd_buf;
828         struct rpc_task_setup task_setup_data = {
829                 .callback_ops = tk_ops,
830         };
831
832         dprintk("RPC: rpc_run_bc_task req= %p\n", req);
833         /*
834          * Create an rpc_task to send the data
835          */
836         task = rpc_new_task(&task_setup_data);
837         if (IS_ERR(task)) {
838                 xprt_free_bc_request(req);
839                 goto out;
840         }
841         task->tk_rqstp = req;
842
843         /*
844          * Set up the xdr_buf length.
845          * This also indicates that the buffer is XDR encoded already.
846          */
847         xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
848                         xbufp->tail[0].iov_len;
849
850         task->tk_action = call_bc_transmit;
851         atomic_inc(&task->tk_count);
852         BUG_ON(atomic_read(&task->tk_count) != 2);
853         rpc_execute(task);
854
855 out:
856         dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
857         return task;
858 }
859 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
860
861 void
862 rpc_call_start(struct rpc_task *task)
863 {
864         task->tk_action = call_start;
865 }
866 EXPORT_SYMBOL_GPL(rpc_call_start);
867
868 /**
869  * rpc_peeraddr - extract remote peer address from clnt's xprt
870  * @clnt: RPC client structure
871  * @buf: target buffer
872  * @bufsize: length of target buffer
873  *
874  * Returns the number of bytes that are actually in the stored address.
875  */
876 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
877 {
878         size_t bytes;
879         struct rpc_xprt *xprt;
880
881         rcu_read_lock();
882         xprt = rcu_dereference(clnt->cl_xprt);
883
884         bytes = xprt->addrlen;
885         if (bytes > bufsize)
886                 bytes = bufsize;
887         memcpy(buf, &xprt->addr, bytes);
888         rcu_read_unlock();
889
890         return bytes;
891 }
892 EXPORT_SYMBOL_GPL(rpc_peeraddr);
893
894 /**
895  * rpc_peeraddr2str - return remote peer address in printable format
896  * @clnt: RPC client structure
897  * @format: address format
898  *
899  * NB: the lifetime of the memory referenced by the returned pointer is
900  * the same as the rpc_xprt itself.  As long as the caller uses this
901  * pointer, it must hold the RCU read lock.
902  */
903 const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
904                              enum rpc_display_format_t format)
905 {
906         struct rpc_xprt *xprt;
907
908         xprt = rcu_dereference(clnt->cl_xprt);
909
910         if (xprt->address_strings[format] != NULL)
911                 return xprt->address_strings[format];
912         else
913                 return "unprintable";
914 }
915 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
916
917 static const struct sockaddr_in rpc_inaddr_loopback = {
918         .sin_family             = AF_INET,
919         .sin_addr.s_addr        = htonl(INADDR_ANY),
920 };
921
922 static const struct sockaddr_in6 rpc_in6addr_loopback = {
923         .sin6_family            = AF_INET6,
924         .sin6_addr              = IN6ADDR_ANY_INIT,
925 };
926
927 /*
928  * Try a getsockname() on a connected datagram socket.  Using a
929  * connected datagram socket prevents leaving a socket in TIME_WAIT.
930  * This conserves the ephemeral port number space.
931  *
932  * Returns zero and fills in "buf" if successful; otherwise, a
933  * negative errno is returned.
934  */
935 static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
936                         struct sockaddr *buf, int buflen)
937 {
938         struct socket *sock;
939         int err;
940
941         err = __sock_create(net, sap->sa_family,
942                                 SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
943         if (err < 0) {
944                 dprintk("RPC:       can't create UDP socket (%d)\n", err);
945                 goto out;
946         }
947
948         switch (sap->sa_family) {
949         case AF_INET:
950                 err = kernel_bind(sock,
951                                 (struct sockaddr *)&rpc_inaddr_loopback,
952                                 sizeof(rpc_inaddr_loopback));
953                 break;
954         case AF_INET6:
955                 err = kernel_bind(sock,
956                                 (struct sockaddr *)&rpc_in6addr_loopback,
957                                 sizeof(rpc_in6addr_loopback));
958                 break;
959         default:
960                 err = -EAFNOSUPPORT;
961                 goto out;
962         }
963         if (err < 0) {
964                 dprintk("RPC:       can't bind UDP socket (%d)\n", err);
965                 goto out_release;
966         }
967
968         err = kernel_connect(sock, sap, salen, 0);
969         if (err < 0) {
970                 dprintk("RPC:       can't connect UDP socket (%d)\n", err);
971                 goto out_release;
972         }
973
974         err = kernel_getsockname(sock, buf, &buflen);
975         if (err < 0) {
976                 dprintk("RPC:       getsockname failed (%d)\n", err);
977                 goto out_release;
978         }
979
980         err = 0;
981         if (buf->sa_family == AF_INET6) {
982                 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
983                 sin6->sin6_scope_id = 0;
984         }
985         dprintk("RPC:       %s succeeded\n", __func__);
986
987 out_release:
988         sock_release(sock);
989 out:
990         return err;
991 }
992
993 /*
994  * Scraping a connected socket failed, so we don't have a useable
995  * local address.  Fallback: generate an address that will prevent
996  * the server from calling us back.
997  *
998  * Returns zero and fills in "buf" if successful; otherwise, a
999  * negative errno is returned.
1000  */
1001 static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1002 {
1003         switch (family) {
1004         case AF_INET:
1005                 if (buflen < sizeof(rpc_inaddr_loopback))
1006                         return -EINVAL;
1007                 memcpy(buf, &rpc_inaddr_loopback,
1008                                 sizeof(rpc_inaddr_loopback));
1009                 break;
1010         case AF_INET6:
1011                 if (buflen < sizeof(rpc_in6addr_loopback))
1012                         return -EINVAL;
1013                 memcpy(buf, &rpc_in6addr_loopback,
1014                                 sizeof(rpc_in6addr_loopback));
1015         default:
1016                 dprintk("RPC:       %s: address family not supported\n",
1017                         __func__);
1018                 return -EAFNOSUPPORT;
1019         }
1020         dprintk("RPC:       %s: succeeded\n", __func__);
1021         return 0;
1022 }
1023
1024 /**
1025  * rpc_localaddr - discover local endpoint address for an RPC client
1026  * @clnt: RPC client structure
1027  * @buf: target buffer
1028  * @buflen: size of target buffer, in bytes
1029  *
1030  * Returns zero and fills in "buf" and "buflen" if successful;
1031  * otherwise, a negative errno is returned.
1032  *
1033  * This works even if the underlying transport is not currently connected,
1034  * or if the upper layer never previously provided a source address.
1035  *
1036  * The result of this function call is transient: multiple calls in
1037  * succession may give different results, depending on how local
1038  * networking configuration changes over time.
1039  */
1040 int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1041 {
1042         struct sockaddr_storage address;
1043         struct sockaddr *sap = (struct sockaddr *)&address;
1044         struct rpc_xprt *xprt;
1045         struct net *net;
1046         size_t salen;
1047         int err;
1048
1049         rcu_read_lock();
1050         xprt = rcu_dereference(clnt->cl_xprt);
1051         salen = xprt->addrlen;
1052         memcpy(sap, &xprt->addr, salen);
1053         net = get_net(xprt->xprt_net);
1054         rcu_read_unlock();
1055
1056         rpc_set_port(sap, 0);
1057         err = rpc_sockname(net, sap, salen, buf, buflen);
1058         put_net(net);
1059         if (err != 0)
1060                 /* Couldn't discover local address, return ANYADDR */
1061                 return rpc_anyaddr(sap->sa_family, buf, buflen);
1062         return 0;
1063 }
1064 EXPORT_SYMBOL_GPL(rpc_localaddr);
1065
1066 void
1067 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1068 {
1069         struct rpc_xprt *xprt;
1070
1071         rcu_read_lock();
1072         xprt = rcu_dereference(clnt->cl_xprt);
1073         if (xprt->ops->set_buffer_size)
1074                 xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1075         rcu_read_unlock();
1076 }
1077 EXPORT_SYMBOL_GPL(rpc_setbufsize);
1078
1079 /**
1080  * rpc_protocol - Get transport protocol number for an RPC client
1081  * @clnt: RPC client to query
1082  *
1083  */
1084 int rpc_protocol(struct rpc_clnt *clnt)
1085 {
1086         int protocol;
1087
1088         rcu_read_lock();
1089         protocol = rcu_dereference(clnt->cl_xprt)->prot;
1090         rcu_read_unlock();
1091         return protocol;
1092 }
1093 EXPORT_SYMBOL_GPL(rpc_protocol);
1094
1095 /**
1096  * rpc_net_ns - Get the network namespace for this RPC client
1097  * @clnt: RPC client to query
1098  *
1099  */
1100 struct net *rpc_net_ns(struct rpc_clnt *clnt)
1101 {
1102         struct net *ret;
1103
1104         rcu_read_lock();
1105         ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1106         rcu_read_unlock();
1107         return ret;
1108 }
1109 EXPORT_SYMBOL_GPL(rpc_net_ns);
1110
1111 /**
1112  * rpc_max_payload - Get maximum payload size for a transport, in bytes
1113  * @clnt: RPC client to query
1114  *
1115  * For stream transports, this is one RPC record fragment (see RFC
1116  * 1831), as we don't support multi-record requests yet.  For datagram
1117  * transports, this is the size of an IP packet minus the IP, UDP, and
1118  * RPC header sizes.
1119  */
1120 size_t rpc_max_payload(struct rpc_clnt *clnt)
1121 {
1122         size_t ret;
1123
1124         rcu_read_lock();
1125         ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1126         rcu_read_unlock();
1127         return ret;
1128 }
1129 EXPORT_SYMBOL_GPL(rpc_max_payload);
1130
1131 /**
1132  * rpc_force_rebind - force transport to check that remote port is unchanged
1133  * @clnt: client to rebind
1134  *
1135  */
1136 void rpc_force_rebind(struct rpc_clnt *clnt)
1137 {
1138         if (clnt->cl_autobind) {
1139                 rcu_read_lock();
1140                 xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1141                 rcu_read_unlock();
1142         }
1143 }
1144 EXPORT_SYMBOL_GPL(rpc_force_rebind);
1145
1146 /*
1147  * Restart an (async) RPC call from the call_prepare state.
1148  * Usually called from within the exit handler.
1149  */
1150 int
1151 rpc_restart_call_prepare(struct rpc_task *task)
1152 {
1153         if (RPC_ASSASSINATED(task))
1154                 return 0;
1155         task->tk_action = call_start;
1156         if (task->tk_ops->rpc_call_prepare != NULL)
1157                 task->tk_action = rpc_prepare_task;
1158         return 1;
1159 }
1160 EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1161
1162 /*
1163  * Restart an (async) RPC call. Usually called from within the
1164  * exit handler.
1165  */
1166 int
1167 rpc_restart_call(struct rpc_task *task)
1168 {
1169         if (RPC_ASSASSINATED(task))
1170                 return 0;
1171         task->tk_action = call_start;
1172         return 1;
1173 }
1174 EXPORT_SYMBOL_GPL(rpc_restart_call);
1175
1176 #ifdef RPC_DEBUG
1177 static const char *rpc_proc_name(const struct rpc_task *task)
1178 {
1179         const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1180
1181         if (proc) {
1182                 if (proc->p_name)
1183                         return proc->p_name;
1184                 else
1185                         return "NULL";
1186         } else
1187                 return "no proc";
1188 }
1189 #endif
1190
1191 /*
1192  * 0.  Initial state
1193  *
1194  *     Other FSM states can be visited zero or more times, but
1195  *     this state is visited exactly once for each RPC.
1196  */
1197 static void
1198 call_start(struct rpc_task *task)
1199 {
1200         struct rpc_clnt *clnt = task->tk_client;
1201
1202         dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
1203                         clnt->cl_protname, clnt->cl_vers,
1204                         rpc_proc_name(task),
1205                         (RPC_IS_ASYNC(task) ? "async" : "sync"));
1206
1207         /* Increment call count */
1208         task->tk_msg.rpc_proc->p_count++;
1209         clnt->cl_stats->rpccnt++;
1210         task->tk_action = call_reserve;
1211 }
1212
1213 /*
1214  * 1.   Reserve an RPC call slot
1215  */
1216 static void
1217 call_reserve(struct rpc_task *task)
1218 {
1219         dprint_status(task);
1220
1221         task->tk_status  = 0;
1222         task->tk_action  = call_reserveresult;
1223         xprt_reserve(task);
1224 }
1225
1226 /*
1227  * 1b.  Grok the result of xprt_reserve()
1228  */
1229 static void
1230 call_reserveresult(struct rpc_task *task)
1231 {
1232         int status = task->tk_status;
1233
1234         dprint_status(task);
1235
1236         /*
1237          * After a call to xprt_reserve(), we must have either
1238          * a request slot or else an error status.
1239          */
1240         task->tk_status = 0;
1241         if (status >= 0) {
1242                 if (task->tk_rqstp) {
1243                         task->tk_action = call_refresh;
1244                         return;
1245                 }
1246
1247                 printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
1248                                 __func__, status);
1249                 rpc_exit(task, -EIO);
1250                 return;
1251         }
1252
1253         /*
1254          * Even though there was an error, we may have acquired
1255          * a request slot somehow.  Make sure not to leak it.
1256          */
1257         if (task->tk_rqstp) {
1258                 printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
1259                                 __func__, status);
1260                 xprt_release(task);
1261         }
1262
1263         switch (status) {
1264         case -EAGAIN:   /* woken up; retry */
1265                 task->tk_action = call_reserve;
1266                 return;
1267         case -EIO:      /* probably a shutdown */
1268                 break;
1269         default:
1270                 printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
1271                                 __func__, status);
1272                 break;
1273         }
1274         rpc_exit(task, status);
1275 }
1276
1277 /*
1278  * 2.   Bind and/or refresh the credentials
1279  */
1280 static void
1281 call_refresh(struct rpc_task *task)
1282 {
1283         dprint_status(task);
1284
1285         task->tk_action = call_refreshresult;
1286         task->tk_status = 0;
1287         task->tk_client->cl_stats->rpcauthrefresh++;
1288         rpcauth_refreshcred(task);
1289 }
1290
1291 /*
1292  * 2a.  Process the results of a credential refresh
1293  */
1294 static void
1295 call_refreshresult(struct rpc_task *task)
1296 {
1297         int status = task->tk_status;
1298
1299         dprint_status(task);
1300
1301         task->tk_status = 0;
1302         task->tk_action = call_refresh;
1303         switch (status) {
1304         case 0:
1305                 if (rpcauth_uptodatecred(task))
1306                         task->tk_action = call_allocate;
1307                 return;
1308         case -ETIMEDOUT:
1309                 rpc_delay(task, 3*HZ);
1310         case -EAGAIN:
1311                 status = -EACCES;
1312                 if (!task->tk_cred_retry)
1313                         break;
1314                 task->tk_cred_retry--;
1315                 dprintk("RPC: %5u %s: retry refresh creds\n",
1316                                 task->tk_pid, __func__);
1317                 return;
1318         }
1319         dprintk("RPC: %5u %s: refresh creds failed with error %d\n",
1320                                 task->tk_pid, __func__, status);
1321         rpc_exit(task, status);
1322 }
1323
1324 /*
1325  * 2b.  Allocate the buffer. For details, see sched.c:rpc_malloc.
1326  *      (Note: buffer memory is freed in xprt_release).
1327  */
1328 static void
1329 call_allocate(struct rpc_task *task)
1330 {
1331         unsigned int slack = task->tk_rqstp->rq_cred->cr_auth->au_cslack;
1332         struct rpc_rqst *req = task->tk_rqstp;
1333         struct rpc_xprt *xprt = task->tk_xprt;
1334         struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1335
1336         dprint_status(task);
1337
1338         task->tk_status = 0;
1339         task->tk_action = call_bind;
1340
1341         if (req->rq_buffer)
1342                 return;
1343
1344         if (proc->p_proc != 0) {
1345                 BUG_ON(proc->p_arglen == 0);
1346                 if (proc->p_decode != NULL)
1347                         BUG_ON(proc->p_replen == 0);
1348         }
1349
1350         /*
1351          * Calculate the size (in quads) of the RPC call
1352          * and reply headers, and convert both values
1353          * to byte sizes.
1354          */
1355         req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
1356         req->rq_callsize <<= 2;
1357         req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
1358         req->rq_rcvsize <<= 2;
1359
1360         req->rq_buffer = xprt->ops->buf_alloc(task,
1361                                         req->rq_callsize + req->rq_rcvsize);
1362         if (req->rq_buffer != NULL)
1363                 return;
1364
1365         dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
1366
1367         if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1368                 task->tk_action = call_allocate;
1369                 rpc_delay(task, HZ>>4);
1370                 return;
1371         }
1372
1373         rpc_exit(task, -ERESTARTSYS);
1374 }
1375
1376 static inline int
1377 rpc_task_need_encode(struct rpc_task *task)
1378 {
1379         return task->tk_rqstp->rq_snd_buf.len == 0;
1380 }
1381
1382 static inline void
1383 rpc_task_force_reencode(struct rpc_task *task)
1384 {
1385         task->tk_rqstp->rq_snd_buf.len = 0;
1386         task->tk_rqstp->rq_bytes_sent = 0;
1387 }
1388
1389 static inline void
1390 rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
1391 {
1392         buf->head[0].iov_base = start;
1393         buf->head[0].iov_len = len;
1394         buf->tail[0].iov_len = 0;
1395         buf->page_len = 0;
1396         buf->flags = 0;
1397         buf->len = 0;
1398         buf->buflen = len;
1399 }
1400
1401 /*
1402  * 3.   Encode arguments of an RPC call
1403  */
1404 static void
1405 rpc_xdr_encode(struct rpc_task *task)
1406 {
1407         struct rpc_rqst *req = task->tk_rqstp;
1408         kxdreproc_t     encode;
1409         __be32          *p;
1410
1411         dprint_status(task);
1412
1413         rpc_xdr_buf_init(&req->rq_snd_buf,
1414                          req->rq_buffer,
1415                          req->rq_callsize);
1416         rpc_xdr_buf_init(&req->rq_rcv_buf,
1417                          (char *)req->rq_buffer + req->rq_callsize,
1418                          req->rq_rcvsize);
1419
1420         p = rpc_encode_header(task);
1421         if (p == NULL) {
1422                 printk(KERN_INFO "RPC: couldn't encode RPC header, exit EIO\n");
1423                 rpc_exit(task, -EIO);
1424                 return;
1425         }
1426
1427         encode = task->tk_msg.rpc_proc->p_encode;
1428         if (encode == NULL)
1429                 return;
1430
1431         task->tk_status = rpcauth_wrap_req(task, encode, req, p,
1432                         task->tk_msg.rpc_argp);
1433 }
1434
1435 /*
1436  * 4.   Get the server port number if not yet set
1437  */
1438 static void
1439 call_bind(struct rpc_task *task)
1440 {
1441         struct rpc_xprt *xprt = task->tk_xprt;
1442
1443         dprint_status(task);
1444
1445         task->tk_action = call_connect;
1446         if (!xprt_bound(xprt)) {
1447                 task->tk_action = call_bind_status;
1448                 task->tk_timeout = xprt->bind_timeout;
1449                 xprt->ops->rpcbind(task);
1450         }
1451 }
1452
1453 /*
1454  * 4a.  Sort out bind result
1455  */
1456 static void
1457 call_bind_status(struct rpc_task *task)
1458 {
1459         int status = -EIO;
1460
1461         if (task->tk_status >= 0) {
1462                 dprint_status(task);
1463                 task->tk_status = 0;
1464                 task->tk_action = call_connect;
1465                 return;
1466         }
1467
1468         trace_rpc_bind_status(task);
1469         switch (task->tk_status) {
1470         case -ENOMEM:
1471                 dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
1472                 rpc_delay(task, HZ >> 2);
1473                 goto retry_timeout;
1474         case -EACCES:
1475                 dprintk("RPC: %5u remote rpcbind: RPC program/version "
1476                                 "unavailable\n", task->tk_pid);
1477                 /* fail immediately if this is an RPC ping */
1478                 if (task->tk_msg.rpc_proc->p_proc == 0) {
1479                         status = -EOPNOTSUPP;
1480                         break;
1481                 }
1482                 if (task->tk_rebind_retry == 0)
1483                         break;
1484                 task->tk_rebind_retry--;
1485                 rpc_delay(task, 3*HZ);
1486                 goto retry_timeout;
1487         case -ETIMEDOUT:
1488                 dprintk("RPC: %5u rpcbind request timed out\n",
1489                                 task->tk_pid);
1490                 goto retry_timeout;
1491         case -EPFNOSUPPORT:
1492                 /* server doesn't support any rpcbind version we know of */
1493                 dprintk("RPC: %5u unrecognized remote rpcbind service\n",
1494                                 task->tk_pid);
1495                 break;
1496         case -EPROTONOSUPPORT:
1497                 dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
1498                                 task->tk_pid);
1499                 task->tk_status = 0;
1500                 task->tk_action = call_bind;
1501                 return;
1502         case -ECONNREFUSED:             /* connection problems */
1503         case -ECONNRESET:
1504         case -ENOTCONN:
1505         case -EHOSTDOWN:
1506         case -EHOSTUNREACH:
1507         case -ENETUNREACH:
1508         case -EPIPE:
1509                 dprintk("RPC: %5u remote rpcbind unreachable: %d\n",
1510                                 task->tk_pid, task->tk_status);
1511                 if (!RPC_IS_SOFTCONN(task)) {
1512                         rpc_delay(task, 5*HZ);
1513                         goto retry_timeout;
1514                 }
1515                 status = task->tk_status;
1516                 break;
1517         default:
1518                 dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
1519                                 task->tk_pid, -task->tk_status);
1520         }
1521
1522         rpc_exit(task, status);
1523         return;
1524
1525 retry_timeout:
1526         task->tk_action = call_timeout;
1527 }
1528
1529 /*
1530  * 4b.  Connect to the RPC server
1531  */
1532 static void
1533 call_connect(struct rpc_task *task)
1534 {
1535         struct rpc_xprt *xprt = task->tk_xprt;
1536
1537         dprintk("RPC: %5u call_connect xprt %p %s connected\n",
1538                         task->tk_pid, xprt,
1539                         (xprt_connected(xprt) ? "is" : "is not"));
1540
1541         task->tk_action = call_transmit;
1542         if (!xprt_connected(xprt)) {
1543                 task->tk_action = call_connect_status;
1544                 if (task->tk_status < 0)
1545                         return;
1546                 xprt_connect(task);
1547         }
1548 }
1549
1550 /*
1551  * 4c.  Sort out connect result
1552  */
1553 static void
1554 call_connect_status(struct rpc_task *task)
1555 {
1556         struct rpc_clnt *clnt = task->tk_client;
1557         int status = task->tk_status;
1558
1559         dprint_status(task);
1560
1561         task->tk_status = 0;
1562         if (status >= 0 || status == -EAGAIN) {
1563                 clnt->cl_stats->netreconn++;
1564                 task->tk_action = call_transmit;
1565                 return;
1566         }
1567
1568         trace_rpc_connect_status(task, status);
1569         switch (status) {
1570                 /* if soft mounted, test if we've timed out */
1571         case -ETIMEDOUT:
1572                 task->tk_action = call_timeout;
1573                 break;
1574         default:
1575                 rpc_exit(task, -EIO);
1576         }
1577 }
1578
1579 /*
1580  * 5.   Transmit the RPC request, and wait for reply
1581  */
1582 static void
1583 call_transmit(struct rpc_task *task)
1584 {
1585         dprint_status(task);
1586
1587         task->tk_action = call_status;
1588         if (task->tk_status < 0)
1589                 return;
1590         task->tk_status = xprt_prepare_transmit(task);
1591         if (task->tk_status != 0)
1592                 return;
1593         task->tk_action = call_transmit_status;
1594         /* Encode here so that rpcsec_gss can use correct sequence number. */
1595         if (rpc_task_need_encode(task)) {
1596                 BUG_ON(task->tk_rqstp->rq_bytes_sent != 0);
1597                 rpc_xdr_encode(task);
1598                 /* Did the encode result in an error condition? */
1599                 if (task->tk_status != 0) {
1600                         /* Was the error nonfatal? */
1601                         if (task->tk_status == -EAGAIN)
1602                                 rpc_delay(task, HZ >> 4);
1603                         else
1604                                 rpc_exit(task, task->tk_status);
1605                         return;
1606                 }
1607         }
1608         xprt_transmit(task);
1609         if (task->tk_status < 0)
1610                 return;
1611         /*
1612          * On success, ensure that we call xprt_end_transmit() before sleeping
1613          * in order to allow access to the socket to other RPC requests.
1614          */
1615         call_transmit_status(task);
1616         if (rpc_reply_expected(task))
1617                 return;
1618         task->tk_action = rpc_exit_task;
1619         rpc_wake_up_queued_task(&task->tk_xprt->pending, task);
1620 }
1621
1622 /*
1623  * 5a.  Handle cleanup after a transmission
1624  */
1625 static void
1626 call_transmit_status(struct rpc_task *task)
1627 {
1628         task->tk_action = call_status;
1629
1630         /*
1631          * Common case: success.  Force the compiler to put this
1632          * test first.
1633          */
1634         if (task->tk_status == 0) {
1635                 xprt_end_transmit(task);
1636                 rpc_task_force_reencode(task);
1637                 return;
1638         }
1639
1640         switch (task->tk_status) {
1641         case -EAGAIN:
1642                 break;
1643         default:
1644                 dprint_status(task);
1645                 xprt_end_transmit(task);
1646                 rpc_task_force_reencode(task);
1647                 break;
1648                 /*
1649                  * Special cases: if we've been waiting on the
1650                  * socket's write_space() callback, or if the
1651                  * socket just returned a connection error,
1652                  * then hold onto the transport lock.
1653                  */
1654         case -ECONNREFUSED:
1655         case -EHOSTDOWN:
1656         case -EHOSTUNREACH:
1657         case -ENETUNREACH:
1658                 if (RPC_IS_SOFTCONN(task)) {
1659                         xprt_end_transmit(task);
1660                         rpc_exit(task, task->tk_status);
1661                         break;
1662                 }
1663         case -ECONNRESET:
1664         case -ENOTCONN:
1665         case -EPIPE:
1666                 rpc_task_force_reencode(task);
1667         }
1668 }
1669
1670 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1671 /*
1672  * 5b.  Send the backchannel RPC reply.  On error, drop the reply.  In
1673  * addition, disconnect on connectivity errors.
1674  */
1675 static void
1676 call_bc_transmit(struct rpc_task *task)
1677 {
1678         struct rpc_rqst *req = task->tk_rqstp;
1679
1680         BUG_ON(task->tk_status != 0);
1681         task->tk_status = xprt_prepare_transmit(task);
1682         if (task->tk_status == -EAGAIN) {
1683                 /*
1684                  * Could not reserve the transport. Try again after the
1685                  * transport is released.
1686                  */
1687                 task->tk_status = 0;
1688                 task->tk_action = call_bc_transmit;
1689                 return;
1690         }
1691
1692         task->tk_action = rpc_exit_task;
1693         if (task->tk_status < 0) {
1694                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1695                         "error: %d\n", task->tk_status);
1696                 return;
1697         }
1698
1699         xprt_transmit(task);
1700         xprt_end_transmit(task);
1701         dprint_status(task);
1702         switch (task->tk_status) {
1703         case 0:
1704                 /* Success */
1705                 break;
1706         case -EHOSTDOWN:
1707         case -EHOSTUNREACH:
1708         case -ENETUNREACH:
1709         case -ETIMEDOUT:
1710                 /*
1711                  * Problem reaching the server.  Disconnect and let the
1712                  * forechannel reestablish the connection.  The server will
1713                  * have to retransmit the backchannel request and we'll
1714                  * reprocess it.  Since these ops are idempotent, there's no
1715                  * need to cache our reply at this time.
1716                  */
1717                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1718                         "error: %d\n", task->tk_status);
1719                 xprt_conditional_disconnect(task->tk_xprt,
1720                         req->rq_connect_cookie);
1721                 break;
1722         default:
1723                 /*
1724                  * We were unable to reply and will have to drop the
1725                  * request.  The server should reconnect and retransmit.
1726                  */
1727                 BUG_ON(task->tk_status == -EAGAIN);
1728                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1729                         "error: %d\n", task->tk_status);
1730                 break;
1731         }
1732         rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
1733 }
1734 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1735
1736 /*
1737  * 6.   Sort out the RPC call status
1738  */
1739 static void
1740 call_status(struct rpc_task *task)
1741 {
1742         struct rpc_clnt *clnt = task->tk_client;
1743         struct rpc_rqst *req = task->tk_rqstp;
1744         int             status;
1745
1746         if (req->rq_reply_bytes_recvd > 0 && !req->rq_bytes_sent)
1747                 task->tk_status = req->rq_reply_bytes_recvd;
1748
1749         dprint_status(task);
1750
1751         status = task->tk_status;
1752         if (status >= 0) {
1753                 task->tk_action = call_decode;
1754                 return;
1755         }
1756
1757         trace_rpc_call_status(task);
1758         task->tk_status = 0;
1759         switch(status) {
1760         case -EHOSTDOWN:
1761         case -EHOSTUNREACH:
1762         case -ENETUNREACH:
1763                 /*
1764                  * Delay any retries for 3 seconds, then handle as if it
1765                  * were a timeout.
1766                  */
1767                 rpc_delay(task, 3*HZ);
1768         case -ETIMEDOUT:
1769                 task->tk_action = call_timeout;
1770                 if (task->tk_client->cl_discrtry)
1771                         xprt_conditional_disconnect(task->tk_xprt,
1772                                         req->rq_connect_cookie);
1773                 break;
1774         case -ECONNRESET:
1775         case -ECONNREFUSED:
1776                 rpc_force_rebind(clnt);
1777                 rpc_delay(task, 3*HZ);
1778         case -EPIPE:
1779         case -ENOTCONN:
1780                 task->tk_action = call_bind;
1781                 break;
1782         case -EAGAIN:
1783                 task->tk_action = call_transmit;
1784                 break;
1785         case -EIO:
1786                 /* shutdown or soft timeout */
1787                 rpc_exit(task, status);
1788                 break;
1789         default:
1790                 if (clnt->cl_chatty)
1791                         printk("%s: RPC call returned error %d\n",
1792                                clnt->cl_protname, -status);
1793                 rpc_exit(task, status);
1794         }
1795 }
1796
1797 /*
1798  * 6a.  Handle RPC timeout
1799  *      We do not release the request slot, so we keep using the
1800  *      same XID for all retransmits.
1801  */
1802 static void
1803 call_timeout(struct rpc_task *task)
1804 {
1805         struct rpc_clnt *clnt = task->tk_client;
1806
1807         if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
1808                 dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
1809                 goto retry;
1810         }
1811
1812         dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
1813         task->tk_timeouts++;
1814
1815         if (RPC_IS_SOFTCONN(task)) {
1816                 rpc_exit(task, -ETIMEDOUT);
1817                 return;
1818         }
1819         if (RPC_IS_SOFT(task)) {
1820                 if (clnt->cl_chatty)
1821                         rcu_read_lock();
1822                         printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
1823                                 clnt->cl_protname,
1824                                 rcu_dereference(clnt->cl_xprt)->servername);
1825                         rcu_read_unlock();
1826                 if (task->tk_flags & RPC_TASK_TIMEOUT)
1827                         rpc_exit(task, -ETIMEDOUT);
1828                 else
1829                         rpc_exit(task, -EIO);
1830                 return;
1831         }
1832
1833         if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
1834                 task->tk_flags |= RPC_CALL_MAJORSEEN;
1835                 if (clnt->cl_chatty) {
1836                         rcu_read_lock();
1837                         printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
1838                         clnt->cl_protname,
1839                         rcu_dereference(clnt->cl_xprt)->servername);
1840                         rcu_read_unlock();
1841                 }
1842         }
1843         rpc_force_rebind(clnt);
1844         /*
1845          * Did our request time out due to an RPCSEC_GSS out-of-sequence
1846          * event? RFC2203 requires the server to drop all such requests.
1847          */
1848         rpcauth_invalcred(task);
1849
1850 retry:
1851         clnt->cl_stats->rpcretrans++;
1852         task->tk_action = call_bind;
1853         task->tk_status = 0;
1854 }
1855
1856 /*
1857  * 7.   Decode the RPC reply
1858  */
1859 static void
1860 call_decode(struct rpc_task *task)
1861 {
1862         struct rpc_clnt *clnt = task->tk_client;
1863         struct rpc_rqst *req = task->tk_rqstp;
1864         kxdrdproc_t     decode = task->tk_msg.rpc_proc->p_decode;
1865         __be32          *p;
1866
1867         dprint_status(task);
1868
1869         if (task->tk_flags & RPC_CALL_MAJORSEEN) {
1870                 if (clnt->cl_chatty) {
1871                         rcu_read_lock();
1872                         printk(KERN_NOTICE "%s: server %s OK\n",
1873                                 clnt->cl_protname,
1874                                 rcu_dereference(clnt->cl_xprt)->servername);
1875                         rcu_read_unlock();
1876                 }
1877                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
1878         }
1879
1880         /*
1881          * Ensure that we see all writes made by xprt_complete_rqst()
1882          * before it changed req->rq_reply_bytes_recvd.
1883          */
1884         smp_rmb();
1885         req->rq_rcv_buf.len = req->rq_private_buf.len;
1886
1887         /* Check that the softirq receive buffer is valid */
1888         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
1889                                 sizeof(req->rq_rcv_buf)) != 0);
1890
1891         if (req->rq_rcv_buf.len < 12) {
1892                 if (!RPC_IS_SOFT(task)) {
1893                         task->tk_action = call_bind;
1894                         clnt->cl_stats->rpcretrans++;
1895                         goto out_retry;
1896                 }
1897                 dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
1898                                 clnt->cl_protname, task->tk_status);
1899                 task->tk_action = call_timeout;
1900                 goto out_retry;
1901         }
1902
1903         p = rpc_verify_header(task);
1904         if (IS_ERR(p)) {
1905                 if (p == ERR_PTR(-EAGAIN))
1906                         goto out_retry;
1907                 return;
1908         }
1909
1910         task->tk_action = rpc_exit_task;
1911
1912         if (decode) {
1913                 task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
1914                                                       task->tk_msg.rpc_resp);
1915         }
1916         dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
1917                         task->tk_status);
1918         return;
1919 out_retry:
1920         task->tk_status = 0;
1921         /* Note: rpc_verify_header() may have freed the RPC slot */
1922         if (task->tk_rqstp == req) {
1923                 req->rq_reply_bytes_recvd = req->rq_rcv_buf.len = 0;
1924                 if (task->tk_client->cl_discrtry)
1925                         xprt_conditional_disconnect(task->tk_xprt,
1926                                         req->rq_connect_cookie);
1927         }
1928 }
1929
1930 static __be32 *
1931 rpc_encode_header(struct rpc_task *task)
1932 {
1933         struct rpc_clnt *clnt = task->tk_client;
1934         struct rpc_rqst *req = task->tk_rqstp;
1935         __be32          *p = req->rq_svec[0].iov_base;
1936
1937         /* FIXME: check buffer size? */
1938
1939         p = xprt_skip_transport_header(task->tk_xprt, p);
1940         *p++ = req->rq_xid;             /* XID */
1941         *p++ = htonl(RPC_CALL);         /* CALL */
1942         *p++ = htonl(RPC_VERSION);      /* RPC version */
1943         *p++ = htonl(clnt->cl_prog);    /* program number */
1944         *p++ = htonl(clnt->cl_vers);    /* program version */
1945         *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
1946         p = rpcauth_marshcred(task, p);
1947         req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
1948         return p;
1949 }
1950
1951 static __be32 *
1952 rpc_verify_header(struct rpc_task *task)
1953 {
1954         struct rpc_clnt *clnt = task->tk_client;
1955         struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
1956         int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
1957         __be32  *p = iov->iov_base;
1958         u32 n;
1959         int error = -EACCES;
1960
1961         if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
1962                 /* RFC-1014 says that the representation of XDR data must be a
1963                  * multiple of four bytes
1964                  * - if it isn't pointer subtraction in the NFS client may give
1965                  *   undefined results
1966                  */
1967                 dprintk("RPC: %5u %s: XDR representation not a multiple of"
1968                        " 4 bytes: 0x%x\n", task->tk_pid, __func__,
1969                        task->tk_rqstp->rq_rcv_buf.len);
1970                 goto out_eio;
1971         }
1972         if ((len -= 3) < 0)
1973                 goto out_overflow;
1974
1975         p += 1; /* skip XID */
1976         if ((n = ntohl(*p++)) != RPC_REPLY) {
1977                 dprintk("RPC: %5u %s: not an RPC reply: %x\n",
1978                         task->tk_pid, __func__, n);
1979                 goto out_garbage;
1980         }
1981
1982         if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
1983                 if (--len < 0)
1984                         goto out_overflow;
1985                 switch ((n = ntohl(*p++))) {
1986                 case RPC_AUTH_ERROR:
1987                         break;
1988                 case RPC_MISMATCH:
1989                         dprintk("RPC: %5u %s: RPC call version mismatch!\n",
1990                                 task->tk_pid, __func__);
1991                         error = -EPROTONOSUPPORT;
1992                         goto out_err;
1993                 default:
1994                         dprintk("RPC: %5u %s: RPC call rejected, "
1995                                 "unknown error: %x\n",
1996                                 task->tk_pid, __func__, n);
1997                         goto out_eio;
1998                 }
1999                 if (--len < 0)
2000                         goto out_overflow;
2001                 switch ((n = ntohl(*p++))) {
2002                 case RPC_AUTH_REJECTEDCRED:
2003                 case RPC_AUTH_REJECTEDVERF:
2004                 case RPCSEC_GSS_CREDPROBLEM:
2005                 case RPCSEC_GSS_CTXPROBLEM:
2006                         if (!task->tk_cred_retry)
2007                                 break;
2008                         task->tk_cred_retry--;
2009                         dprintk("RPC: %5u %s: retry stale creds\n",
2010                                         task->tk_pid, __func__);
2011                         rpcauth_invalcred(task);
2012                         /* Ensure we obtain a new XID! */
2013                         xprt_release(task);
2014                         task->tk_action = call_reserve;
2015                         goto out_retry;
2016                 case RPC_AUTH_BADCRED:
2017                 case RPC_AUTH_BADVERF:
2018                         /* possibly garbled cred/verf? */
2019                         if (!task->tk_garb_retry)
2020                                 break;
2021                         task->tk_garb_retry--;
2022                         dprintk("RPC: %5u %s: retry garbled creds\n",
2023                                         task->tk_pid, __func__);
2024                         task->tk_action = call_bind;
2025                         goto out_retry;
2026                 case RPC_AUTH_TOOWEAK:
2027                         rcu_read_lock();
2028                         printk(KERN_NOTICE "RPC: server %s requires stronger "
2029                                "authentication.\n",
2030                                rcu_dereference(clnt->cl_xprt)->servername);
2031                         rcu_read_unlock();
2032                         break;
2033                 default:
2034                         dprintk("RPC: %5u %s: unknown auth error: %x\n",
2035                                         task->tk_pid, __func__, n);
2036                         error = -EIO;
2037                 }
2038                 dprintk("RPC: %5u %s: call rejected %d\n",
2039                                 task->tk_pid, __func__, n);
2040                 goto out_err;
2041         }
2042         if (!(p = rpcauth_checkverf(task, p))) {
2043                 dprintk("RPC: %5u %s: auth check failed\n",
2044                                 task->tk_pid, __func__);
2045                 goto out_garbage;               /* bad verifier, retry */
2046         }
2047         len = p - (__be32 *)iov->iov_base - 1;
2048         if (len < 0)
2049                 goto out_overflow;
2050         switch ((n = ntohl(*p++))) {
2051         case RPC_SUCCESS:
2052                 return p;
2053         case RPC_PROG_UNAVAIL:
2054                 dprintk_rcu("RPC: %5u %s: program %u is unsupported "
2055                                 "by server %s\n", task->tk_pid, __func__,
2056                                 (unsigned int)clnt->cl_prog,
2057                                 rcu_dereference(clnt->cl_xprt)->servername);
2058                 error = -EPFNOSUPPORT;
2059                 goto out_err;
2060         case RPC_PROG_MISMATCH:
2061                 dprintk_rcu("RPC: %5u %s: program %u, version %u unsupported "
2062                                 "by server %s\n", task->tk_pid, __func__,
2063                                 (unsigned int)clnt->cl_prog,
2064                                 (unsigned int)clnt->cl_vers,
2065                                 rcu_dereference(clnt->cl_xprt)->servername);
2066                 error = -EPROTONOSUPPORT;
2067                 goto out_err;
2068         case RPC_PROC_UNAVAIL:
2069                 dprintk_rcu("RPC: %5u %s: proc %s unsupported by program %u, "
2070                                 "version %u on server %s\n",
2071                                 task->tk_pid, __func__,
2072                                 rpc_proc_name(task),
2073                                 clnt->cl_prog, clnt->cl_vers,
2074                                 rcu_dereference(clnt->cl_xprt)->servername);
2075                 error = -EOPNOTSUPP;
2076                 goto out_err;
2077         case RPC_GARBAGE_ARGS:
2078                 dprintk("RPC: %5u %s: server saw garbage\n",
2079                                 task->tk_pid, __func__);
2080                 break;                  /* retry */
2081         default:
2082                 dprintk("RPC: %5u %s: server accept status: %x\n",
2083                                 task->tk_pid, __func__, n);
2084                 /* Also retry */
2085         }
2086
2087 out_garbage:
2088         clnt->cl_stats->rpcgarbage++;
2089         if (task->tk_garb_retry) {
2090                 task->tk_garb_retry--;
2091                 dprintk("RPC: %5u %s: retrying\n",
2092                                 task->tk_pid, __func__);
2093                 task->tk_action = call_bind;
2094 out_retry:
2095                 return ERR_PTR(-EAGAIN);
2096         }
2097 out_eio:
2098         error = -EIO;
2099 out_err:
2100         rpc_exit(task, error);
2101         dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
2102                         __func__, error);
2103         return ERR_PTR(error);
2104 out_overflow:
2105         dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
2106                         __func__);
2107         goto out_garbage;
2108 }
2109
2110 static void rpcproc_encode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2111 {
2112 }
2113
2114 static int rpcproc_decode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2115 {
2116         return 0;
2117 }
2118
2119 static struct rpc_procinfo rpcproc_null = {
2120         .p_encode = rpcproc_encode_null,
2121         .p_decode = rpcproc_decode_null,
2122 };
2123
2124 static int rpc_ping(struct rpc_clnt *clnt)
2125 {
2126         struct rpc_message msg = {
2127                 .rpc_proc = &rpcproc_null,
2128         };
2129         int err;
2130         msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
2131         err = rpc_call_sync(clnt, &msg, RPC_TASK_SOFT | RPC_TASK_SOFTCONN);
2132         put_rpccred(msg.rpc_cred);
2133         return err;
2134 }
2135
2136 struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2137 {
2138         struct rpc_message msg = {
2139                 .rpc_proc = &rpcproc_null,
2140                 .rpc_cred = cred,
2141         };
2142         struct rpc_task_setup task_setup_data = {
2143                 .rpc_client = clnt,
2144                 .rpc_message = &msg,
2145                 .callback_ops = &rpc_default_ops,
2146                 .flags = flags,
2147         };
2148         return rpc_run_task(&task_setup_data);
2149 }
2150 EXPORT_SYMBOL_GPL(rpc_call_null);
2151
2152 #ifdef RPC_DEBUG
2153 static void rpc_show_header(void)
2154 {
2155         printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
2156                 "-timeout ---ops--\n");
2157 }
2158
2159 static void rpc_show_task(const struct rpc_clnt *clnt,
2160                           const struct rpc_task *task)
2161 {
2162         const char *rpc_waitq = "none";
2163
2164         if (RPC_IS_QUEUED(task))
2165                 rpc_waitq = rpc_qname(task->tk_waitqueue);
2166
2167         printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
2168                 task->tk_pid, task->tk_flags, task->tk_status,
2169                 clnt, task->tk_rqstp, task->tk_timeout, task->tk_ops,
2170                 clnt->cl_protname, clnt->cl_vers, rpc_proc_name(task),
2171                 task->tk_action, rpc_waitq);
2172 }
2173
2174 void rpc_show_tasks(struct net *net)
2175 {
2176         struct rpc_clnt *clnt;
2177         struct rpc_task *task;
2178         int header = 0;
2179         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
2180
2181         spin_lock(&sn->rpc_client_lock);
2182         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
2183                 spin_lock(&clnt->cl_lock);
2184                 list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
2185                         if (!header) {
2186                                 rpc_show_header();
2187                                 header++;
2188                         }
2189                         rpc_show_task(clnt, task);
2190                 }
2191                 spin_unlock(&clnt->cl_lock);
2192         }
2193         spin_unlock(&sn->rpc_client_lock);
2194 }
2195 #endif