Merge tag 'split-asm_system_h-for-linus-20120328' of git://git.kernel.org/pub/scm...
[linux-flexiantxendom0-3.2.10.git] / net / sunrpc / clnt.c
index d3daab6..6797246 100644 (file)
 #include <linux/in.h>
 #include <linux/in6.h>
 #include <linux/un.h>
+#include <linux/rcupdate.h>
 
 #include <linux/sunrpc/clnt.h>
 #include <linux/sunrpc/rpc_pipe_fs.h>
 #include <linux/sunrpc/metrics.h>
 #include <linux/sunrpc/bc_xprt.h>
+#include <trace/events/sunrpc.h>
 
 #include "sunrpc.h"
+#include "netns.h"
 
 #ifdef RPC_DEBUG
 # define RPCDBG_FACILITY       RPCDBG_CALL
@@ -49,8 +52,6 @@
 /*
  * All RPC clients are linked into this list
  */
-static LIST_HEAD(all_clients);
-static DEFINE_SPINLOCK(rpc_client_lock);
 
 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
 
@@ -80,82 +81,191 @@ static int rpc_ping(struct rpc_clnt *clnt);
 
 static void rpc_register_client(struct rpc_clnt *clnt)
 {
-       spin_lock(&rpc_client_lock);
-       list_add(&clnt->cl_clients, &all_clients);
-       spin_unlock(&rpc_client_lock);
+       struct net *net = rpc_net_ns(clnt);
+       struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
+
+       spin_lock(&sn->rpc_client_lock);
+       list_add(&clnt->cl_clients, &sn->all_clients);
+       spin_unlock(&sn->rpc_client_lock);
 }
 
 static void rpc_unregister_client(struct rpc_clnt *clnt)
 {
-       spin_lock(&rpc_client_lock);
+       struct net *net = rpc_net_ns(clnt);
+       struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
+
+       spin_lock(&sn->rpc_client_lock);
        list_del(&clnt->cl_clients);
-       spin_unlock(&rpc_client_lock);
+       spin_unlock(&sn->rpc_client_lock);
 }
 
-static int
-rpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name)
+static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
+{
+       if (clnt->cl_dentry) {
+               if (clnt->cl_auth && clnt->cl_auth->au_ops->pipes_destroy)
+                       clnt->cl_auth->au_ops->pipes_destroy(clnt->cl_auth);
+               rpc_remove_client_dir(clnt->cl_dentry);
+       }
+       clnt->cl_dentry = NULL;
+}
+
+static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
+{
+       struct net *net = rpc_net_ns(clnt);
+       struct super_block *pipefs_sb;
+
+       pipefs_sb = rpc_get_sb_net(net);
+       if (pipefs_sb) {
+               __rpc_clnt_remove_pipedir(clnt);
+               rpc_put_sb_net(net);
+       }
+}
+
+static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
+                                   struct rpc_clnt *clnt,
+                                   const char *dir_name)
 {
        static uint32_t clntid;
-       struct path path, dir;
        char name[15];
        struct qstr q = {
                .name = name,
        };
+       struct dentry *dir, *dentry;
        int error;
 
-       clnt->cl_path.mnt = ERR_PTR(-ENOENT);
-       clnt->cl_path.dentry = ERR_PTR(-ENOENT);
-       if (dir_name == NULL)
-               return 0;
-
-       path.mnt = rpc_get_mount();
-       if (IS_ERR(path.mnt))
-               return PTR_ERR(path.mnt);
-       error = vfs_path_lookup(path.mnt->mnt_root, path.mnt, dir_name, 0, &dir);
-       if (error)
-               goto err;
-
+       dir = rpc_d_lookup_sb(sb, dir_name);
+       if (dir == NULL)
+               return dir;
        for (;;) {
                q.len = snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
                name[sizeof(name) - 1] = '\0';
                q.hash = full_name_hash(q.name, q.len);
-               path.dentry = rpc_create_client_dir(dir.dentry, &q, clnt);
-               if (!IS_ERR(path.dentry))
+               dentry = rpc_create_client_dir(dir, &q, clnt);
+               if (!IS_ERR(dentry))
                        break;
-               error = PTR_ERR(path.dentry);
+               error = PTR_ERR(dentry);
                if (error != -EEXIST) {
                        printk(KERN_INFO "RPC: Couldn't create pipefs entry"
                                        " %s/%s, error %d\n",
                                        dir_name, name, error);
-                       goto err_path_put;
+                       break;
                }
        }
-       path_put(&dir);
-       clnt->cl_path = path;
+       dput(dir);
+       return dentry;
+}
+
+static int
+rpc_setup_pipedir(struct rpc_clnt *clnt, const char *dir_name)
+{
+       struct net *net = rpc_net_ns(clnt);
+       struct super_block *pipefs_sb;
+       struct dentry *dentry;
+
+       clnt->cl_dentry = NULL;
+       if (dir_name == NULL)
+               return 0;
+       pipefs_sb = rpc_get_sb_net(net);
+       if (!pipefs_sb)
+               return 0;
+       dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt, dir_name);
+       rpc_put_sb_net(net);
+       if (IS_ERR(dentry))
+               return PTR_ERR(dentry);
+       clnt->cl_dentry = dentry;
        return 0;
-err_path_put:
-       path_put(&dir);
-err:
-       rpc_put_mount();
+}
+
+static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
+                               struct super_block *sb)
+{
+       struct dentry *dentry;
+       int err = 0;
+
+       switch (event) {
+       case RPC_PIPEFS_MOUNT:
+               if (clnt->cl_program->pipe_dir_name == NULL)
+                       break;
+               dentry = rpc_setup_pipedir_sb(sb, clnt,
+                                             clnt->cl_program->pipe_dir_name);
+               BUG_ON(dentry == NULL);
+               if (IS_ERR(dentry))
+                       return PTR_ERR(dentry);
+               clnt->cl_dentry = dentry;
+               if (clnt->cl_auth->au_ops->pipes_create) {
+                       err = clnt->cl_auth->au_ops->pipes_create(clnt->cl_auth);
+                       if (err)
+                               __rpc_clnt_remove_pipedir(clnt);
+               }
+               break;
+       case RPC_PIPEFS_UMOUNT:
+               __rpc_clnt_remove_pipedir(clnt);
+               break;
+       default:
+               printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
+               return -ENOTSUPP;
+       }
+       return err;
+}
+
+static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
+{
+       struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
+       struct rpc_clnt *clnt;
+
+       spin_lock(&sn->rpc_client_lock);
+       list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
+               if (((event == RPC_PIPEFS_MOUNT) && clnt->cl_dentry) ||
+                   ((event == RPC_PIPEFS_UMOUNT) && !clnt->cl_dentry))
+                       continue;
+               atomic_inc(&clnt->cl_count);
+               spin_unlock(&sn->rpc_client_lock);
+               return clnt;
+       }
+       spin_unlock(&sn->rpc_client_lock);
+       return NULL;
+}
+
+static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
+                           void *ptr)
+{
+       struct super_block *sb = ptr;
+       struct rpc_clnt *clnt;
+       int error = 0;
+
+       while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
+               error = __rpc_pipefs_event(clnt, event, sb);
+               rpc_release_client(clnt);
+               if (error)
+                       break;
+       }
        return error;
 }
 
+static struct notifier_block rpc_clients_block = {
+       .notifier_call  = rpc_pipefs_event,
+       .priority       = SUNRPC_PIPEFS_RPC_PRIO,
+};
+
+int rpc_clients_notifier_register(void)
+{
+       return rpc_pipefs_notifier_register(&rpc_clients_block);
+}
+
+void rpc_clients_notifier_unregister(void)
+{
+       return rpc_pipefs_notifier_unregister(&rpc_clients_block);
+}
+
 static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, struct rpc_xprt *xprt)
 {
-       struct rpc_program      *program = args->program;
-       struct rpc_version      *version;
+       const struct rpc_program *program = args->program;
+       const struct rpc_version *version;
        struct rpc_clnt         *clnt = NULL;
        struct rpc_auth         *auth;
        int err;
-       size_t len;
 
        /* sanity check the name before trying to print it */
-       err = -EINVAL;
-       len = strlen(args->servername);
-       if (len > RPC_MAXNETNAMELEN)
-               goto out_no_rpciod;
-       len++;
-
        dprintk("RPC:       creating %s client for %s (xprt %p)\n",
                        program->name, args->servername, xprt);
 
@@ -178,17 +288,7 @@ static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, stru
                goto out_err;
        clnt->cl_parent = clnt;
 
-       clnt->cl_server = clnt->cl_inline_name;
-       if (len > sizeof(clnt->cl_inline_name)) {
-               char *buf = kmalloc(len, GFP_KERNEL);
-               if (buf != NULL)
-                       clnt->cl_server = buf;
-               else
-                       len = sizeof(clnt->cl_inline_name);
-       }
-       strlcpy(clnt->cl_server, args->servername, len);
-
-       clnt->cl_xprt     = xprt;
+       rcu_assign_pointer(clnt->cl_xprt, xprt);
        clnt->cl_procinfo = version->procs;
        clnt->cl_maxproc  = version->nrprocs;
        clnt->cl_protname = program->name;
@@ -203,7 +303,7 @@ static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, stru
        INIT_LIST_HEAD(&clnt->cl_tasks);
        spin_lock_init(&clnt->cl_lock);
 
-       if (!xprt_bound(clnt->cl_xprt))
+       if (!xprt_bound(xprt))
                clnt->cl_autobind = 1;
 
        clnt->cl_timeout = xprt->timeout;
@@ -245,17 +345,12 @@ static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, stru
        return clnt;
 
 out_no_auth:
-       if (!IS_ERR(clnt->cl_path.dentry)) {
-               rpc_remove_client_dir(clnt->cl_path.dentry);
-               rpc_put_mount();
-       }
+       rpc_clnt_remove_pipedir(clnt);
 out_no_path:
        kfree(clnt->cl_principal);
 out_no_principal:
        rpc_free_iostats(clnt->cl_metrics);
 out_no_stats:
-       if (clnt->cl_server != clnt->cl_inline_name)
-               kfree(clnt->cl_server);
        kfree(clnt);
 out_err:
        xprt_put(xprt);
@@ -285,6 +380,7 @@ struct rpc_clnt *rpc_create(struct rpc_create_args *args)
                .srcaddr = args->saddress,
                .dstaddr = args->address,
                .addrlen = args->addrsize,
+               .servername = args->servername,
                .bc_xprt = args->bc_xprt,
        };
        char servername[48];
@@ -293,7 +389,7 @@ struct rpc_clnt *rpc_create(struct rpc_create_args *args)
         * If the caller chooses not to specify a hostname, whip
         * up a string representation of the passed-in address.
         */
-       if (args->servername == NULL) {
+       if (xprtargs.servername == NULL) {
                struct sockaddr_un *sun =
                                (struct sockaddr_un *)args->address;
                struct sockaddr_in *sin =
@@ -320,7 +416,7 @@ struct rpc_clnt *rpc_create(struct rpc_create_args *args)
                         * address family isn't recognized. */
                        return ERR_PTR(-EINVAL);
                }
-               args->servername = servername;
+               xprtargs.servername = servername;
        }
 
        xprt = xprt_create_transport(&xprtargs);
@@ -373,6 +469,7 @@ struct rpc_clnt *
 rpc_clone_client(struct rpc_clnt *clnt)
 {
        struct rpc_clnt *new;
+       struct rpc_xprt *xprt;
        int err = -ENOMEM;
 
        new = kmemdup(clnt, sizeof(*new), GFP_KERNEL);
@@ -392,18 +489,25 @@ rpc_clone_client(struct rpc_clnt *clnt)
                if (new->cl_principal == NULL)
                        goto out_no_principal;
        }
+       rcu_read_lock();
+       xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
+       rcu_read_unlock();
+       if (xprt == NULL)
+               goto out_no_transport;
+       rcu_assign_pointer(new->cl_xprt, xprt);
        atomic_set(&new->cl_count, 1);
        err = rpc_setup_pipedir(new, clnt->cl_program->pipe_dir_name);
        if (err != 0)
                goto out_no_path;
        if (new->cl_auth)
                atomic_inc(&new->cl_auth->au_count);
-       xprt_get(clnt->cl_xprt);
        atomic_inc(&clnt->cl_count);
        rpc_register_client(new);
        rpciod_up();
        return new;
 out_no_path:
+       xprt_put(xprt);
+out_no_transport:
        kfree(new->cl_principal);
 out_no_principal:
        rpc_free_iostats(new->cl_metrics);
@@ -452,8 +556,9 @@ EXPORT_SYMBOL_GPL(rpc_killall_tasks);
  */
 void rpc_shutdown_client(struct rpc_clnt *clnt)
 {
-       dprintk("RPC:       shutting down %s client for %s\n",
-                       clnt->cl_protname, clnt->cl_server);
+       dprintk_rcu("RPC:       shutting down %s client for %s\n",
+                       clnt->cl_protname,
+                       rcu_dereference(clnt->cl_xprt)->servername);
 
        while (!list_empty(&clnt->cl_tasks)) {
                rpc_killall_tasks(clnt);
@@ -471,24 +576,17 @@ EXPORT_SYMBOL_GPL(rpc_shutdown_client);
 static void
 rpc_free_client(struct rpc_clnt *clnt)
 {
-       dprintk("RPC:       destroying %s client for %s\n",
-                       clnt->cl_protname, clnt->cl_server);
-       if (!IS_ERR(clnt->cl_path.dentry)) {
-               rpc_remove_client_dir(clnt->cl_path.dentry);
-               rpc_put_mount();
-       }
-       if (clnt->cl_parent != clnt) {
+       dprintk_rcu("RPC:       destroying %s client for %s\n",
+                       clnt->cl_protname,
+                       rcu_dereference(clnt->cl_xprt)->servername);
+       if (clnt->cl_parent != clnt)
                rpc_release_client(clnt->cl_parent);
-               goto out_free;
-       }
-       if (clnt->cl_server != clnt->cl_inline_name)
-               kfree(clnt->cl_server);
-out_free:
        rpc_unregister_client(clnt);
+       rpc_clnt_remove_pipedir(clnt);
        rpc_free_iostats(clnt->cl_metrics);
        kfree(clnt->cl_principal);
        clnt->cl_metrics = NULL;
-       xprt_put(clnt->cl_xprt);
+       xprt_put(rcu_dereference_raw(clnt->cl_xprt));
        rpciod_down();
        kfree(clnt);
 }
@@ -541,11 +639,11 @@ rpc_release_client(struct rpc_clnt *clnt)
  * The Sun NFSv2/v3 ACL protocol can do this.
  */
 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
-                                     struct rpc_program *program,
+                                     const struct rpc_program *program,
                                      u32 vers)
 {
        struct rpc_clnt *clnt;
-       struct rpc_version *version;
+       const struct rpc_version *version;
        int err;
 
        BUG_ON(vers >= program->nrvers || !program->version[vers]);
@@ -777,13 +875,18 @@ EXPORT_SYMBOL_GPL(rpc_call_start);
 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
 {
        size_t bytes;
-       struct rpc_xprt *xprt = clnt->cl_xprt;
+       struct rpc_xprt *xprt;
 
-       bytes = sizeof(xprt->addr);
+       rcu_read_lock();
+       xprt = rcu_dereference(clnt->cl_xprt);
+
+       bytes = xprt->addrlen;
        if (bytes > bufsize)
                bytes = bufsize;
-       memcpy(buf, &clnt->cl_xprt->addr, bytes);
-       return xprt->addrlen;
+       memcpy(buf, &xprt->addr, bytes);
+       rcu_read_unlock();
+
+       return bytes;
 }
 EXPORT_SYMBOL_GPL(rpc_peeraddr);
 
@@ -792,11 +895,16 @@ EXPORT_SYMBOL_GPL(rpc_peeraddr);
  * @clnt: RPC client structure
  * @format: address format
  *
+ * NB: the lifetime of the memory referenced by the returned pointer is
+ * the same as the rpc_xprt itself.  As long as the caller uses this
+ * pointer, it must hold the RCU read lock.
  */
 const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
                             enum rpc_display_format_t format)
 {
-       struct rpc_xprt *xprt = clnt->cl_xprt;
+       struct rpc_xprt *xprt;
+
+       xprt = rcu_dereference(clnt->cl_xprt);
 
        if (xprt->address_strings[format] != NULL)
                return xprt->address_strings[format];
@@ -805,17 +913,203 @@ const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
 }
 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
 
+static const struct sockaddr_in rpc_inaddr_loopback = {
+       .sin_family             = AF_INET,
+       .sin_addr.s_addr        = htonl(INADDR_ANY),
+};
+
+static const struct sockaddr_in6 rpc_in6addr_loopback = {
+       .sin6_family            = AF_INET6,
+       .sin6_addr              = IN6ADDR_ANY_INIT,
+};
+
+/*
+ * Try a getsockname() on a connected datagram socket.  Using a
+ * connected datagram socket prevents leaving a socket in TIME_WAIT.
+ * This conserves the ephemeral port number space.
+ *
+ * Returns zero and fills in "buf" if successful; otherwise, a
+ * negative errno is returned.
+ */
+static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
+                       struct sockaddr *buf, int buflen)
+{
+       struct socket *sock;
+       int err;
+
+       err = __sock_create(net, sap->sa_family,
+                               SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
+       if (err < 0) {
+               dprintk("RPC:       can't create UDP socket (%d)\n", err);
+               goto out;
+       }
+
+       switch (sap->sa_family) {
+       case AF_INET:
+               err = kernel_bind(sock,
+                               (struct sockaddr *)&rpc_inaddr_loopback,
+                               sizeof(rpc_inaddr_loopback));
+               break;
+       case AF_INET6:
+               err = kernel_bind(sock,
+                               (struct sockaddr *)&rpc_in6addr_loopback,
+                               sizeof(rpc_in6addr_loopback));
+               break;
+       default:
+               err = -EAFNOSUPPORT;
+               goto out;
+       }
+       if (err < 0) {
+               dprintk("RPC:       can't bind UDP socket (%d)\n", err);
+               goto out_release;
+       }
+
+       err = kernel_connect(sock, sap, salen, 0);
+       if (err < 0) {
+               dprintk("RPC:       can't connect UDP socket (%d)\n", err);
+               goto out_release;
+       }
+
+       err = kernel_getsockname(sock, buf, &buflen);
+       if (err < 0) {
+               dprintk("RPC:       getsockname failed (%d)\n", err);
+               goto out_release;
+       }
+
+       err = 0;
+       if (buf->sa_family == AF_INET6) {
+               struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
+               sin6->sin6_scope_id = 0;
+       }
+       dprintk("RPC:       %s succeeded\n", __func__);
+
+out_release:
+       sock_release(sock);
+out:
+       return err;
+}
+
+/*
+ * Scraping a connected socket failed, so we don't have a useable
+ * local address.  Fallback: generate an address that will prevent
+ * the server from calling us back.
+ *
+ * Returns zero and fills in "buf" if successful; otherwise, a
+ * negative errno is returned.
+ */
+static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
+{
+       switch (family) {
+       case AF_INET:
+               if (buflen < sizeof(rpc_inaddr_loopback))
+                       return -EINVAL;
+               memcpy(buf, &rpc_inaddr_loopback,
+                               sizeof(rpc_inaddr_loopback));
+               break;
+       case AF_INET6:
+               if (buflen < sizeof(rpc_in6addr_loopback))
+                       return -EINVAL;
+               memcpy(buf, &rpc_in6addr_loopback,
+                               sizeof(rpc_in6addr_loopback));
+       default:
+               dprintk("RPC:       %s: address family not supported\n",
+                       __func__);
+               return -EAFNOSUPPORT;
+       }
+       dprintk("RPC:       %s: succeeded\n", __func__);
+       return 0;
+}
+
+/**
+ * rpc_localaddr - discover local endpoint address for an RPC client
+ * @clnt: RPC client structure
+ * @buf: target buffer
+ * @buflen: size of target buffer, in bytes
+ *
+ * Returns zero and fills in "buf" and "buflen" if successful;
+ * otherwise, a negative errno is returned.
+ *
+ * This works even if the underlying transport is not currently connected,
+ * or if the upper layer never previously provided a source address.
+ *
+ * The result of this function call is transient: multiple calls in
+ * succession may give different results, depending on how local
+ * networking configuration changes over time.
+ */
+int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
+{
+       struct sockaddr_storage address;
+       struct sockaddr *sap = (struct sockaddr *)&address;
+       struct rpc_xprt *xprt;
+       struct net *net;
+       size_t salen;
+       int err;
+
+       rcu_read_lock();
+       xprt = rcu_dereference(clnt->cl_xprt);
+       salen = xprt->addrlen;
+       memcpy(sap, &xprt->addr, salen);
+       net = get_net(xprt->xprt_net);
+       rcu_read_unlock();
+
+       rpc_set_port(sap, 0);
+       err = rpc_sockname(net, sap, salen, buf, buflen);
+       put_net(net);
+       if (err != 0)
+               /* Couldn't discover local address, return ANYADDR */
+               return rpc_anyaddr(sap->sa_family, buf, buflen);
+       return 0;
+}
+EXPORT_SYMBOL_GPL(rpc_localaddr);
+
 void
 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
 {
-       struct rpc_xprt *xprt = clnt->cl_xprt;
+       struct rpc_xprt *xprt;
+
+       rcu_read_lock();
+       xprt = rcu_dereference(clnt->cl_xprt);
        if (xprt->ops->set_buffer_size)
                xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
+       rcu_read_unlock();
 }
 EXPORT_SYMBOL_GPL(rpc_setbufsize);
 
-/*
- * Return size of largest payload RPC client can support, in bytes
+/**
+ * rpc_protocol - Get transport protocol number for an RPC client
+ * @clnt: RPC client to query
+ *
+ */
+int rpc_protocol(struct rpc_clnt *clnt)
+{
+       int protocol;
+
+       rcu_read_lock();
+       protocol = rcu_dereference(clnt->cl_xprt)->prot;
+       rcu_read_unlock();
+       return protocol;
+}
+EXPORT_SYMBOL_GPL(rpc_protocol);
+
+/**
+ * rpc_net_ns - Get the network namespace for this RPC client
+ * @clnt: RPC client to query
+ *
+ */
+struct net *rpc_net_ns(struct rpc_clnt *clnt)
+{
+       struct net *ret;
+
+       rcu_read_lock();
+       ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
+       rcu_read_unlock();
+       return ret;
+}
+EXPORT_SYMBOL_GPL(rpc_net_ns);
+
+/**
+ * rpc_max_payload - Get maximum payload size for a transport, in bytes
+ * @clnt: RPC client to query
  *
  * For stream transports, this is one RPC record fragment (see RFC
  * 1831), as we don't support multi-record requests yet.  For datagram
@@ -824,7 +1118,12 @@ EXPORT_SYMBOL_GPL(rpc_setbufsize);
  */
 size_t rpc_max_payload(struct rpc_clnt *clnt)
 {
-       return clnt->cl_xprt->max_payload;
+       size_t ret;
+
+       rcu_read_lock();
+       ret = rcu_dereference(clnt->cl_xprt)->max_payload;
+       rcu_read_unlock();
+       return ret;
 }
 EXPORT_SYMBOL_GPL(rpc_max_payload);
 
@@ -835,8 +1134,11 @@ EXPORT_SYMBOL_GPL(rpc_max_payload);
  */
 void rpc_force_rebind(struct rpc_clnt *clnt)
 {
-       if (clnt->cl_autobind)
-               xprt_clear_bound(clnt->cl_xprt);
+       if (clnt->cl_autobind) {
+               rcu_read_lock();
+               xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
+               rcu_read_unlock();
+       }
 }
 EXPORT_SYMBOL_GPL(rpc_force_rebind);
 
@@ -1162,6 +1464,7 @@ call_bind_status(struct rpc_task *task)
                return;
        }
 
+       trace_rpc_bind_status(task);
        switch (task->tk_status) {
        case -ENOMEM:
                dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
@@ -1261,6 +1564,7 @@ call_connect_status(struct rpc_task *task)
                return;
        }
 
+       trace_rpc_connect_status(task, status);
        switch (status) {
                /* if soft mounted, test if we've timed out */
        case -ETIMEDOUT:
@@ -1449,6 +1753,7 @@ call_status(struct rpc_task *task)
                return;
        }
 
+       trace_rpc_call_status(task);
        task->tk_status = 0;
        switch(status) {
        case -EHOSTDOWN:
@@ -1512,8 +1817,11 @@ call_timeout(struct rpc_task *task)
        }
        if (RPC_IS_SOFT(task)) {
                if (clnt->cl_chatty)
+                       rcu_read_lock();
                        printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
-                               clnt->cl_protname, clnt->cl_server);
+                               clnt->cl_protname,
+                               rcu_dereference(clnt->cl_xprt)->servername);
+                       rcu_read_unlock();
                if (task->tk_flags & RPC_TASK_TIMEOUT)
                        rpc_exit(task, -ETIMEDOUT);
                else
@@ -1523,9 +1831,13 @@ call_timeout(struct rpc_task *task)
 
        if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
                task->tk_flags |= RPC_CALL_MAJORSEEN;
-               if (clnt->cl_chatty)
+               if (clnt->cl_chatty) {
+                       rcu_read_lock();
                        printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
-                       clnt->cl_protname, clnt->cl_server);
+                       clnt->cl_protname,
+                       rcu_dereference(clnt->cl_xprt)->servername);
+                       rcu_read_unlock();
+               }
        }
        rpc_force_rebind(clnt);
        /*
@@ -1554,9 +1866,13 @@ call_decode(struct rpc_task *task)
        dprint_status(task);
 
        if (task->tk_flags & RPC_CALL_MAJORSEEN) {
-               if (clnt->cl_chatty)
+               if (clnt->cl_chatty) {
+                       rcu_read_lock();
                        printk(KERN_NOTICE "%s: server %s OK\n",
-                               clnt->cl_protname, clnt->cl_server);
+                               clnt->cl_protname,
+                               rcu_dereference(clnt->cl_xprt)->servername);
+                       rcu_read_unlock();
+               }
                task->tk_flags &= ~RPC_CALL_MAJORSEEN;
        }
 
@@ -1634,6 +1950,7 @@ rpc_encode_header(struct rpc_task *task)
 static __be32 *
 rpc_verify_header(struct rpc_task *task)
 {
+       struct rpc_clnt *clnt = task->tk_client;
        struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
        int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
        __be32  *p = iov->iov_base;
@@ -1706,8 +2023,11 @@ rpc_verify_header(struct rpc_task *task)
                        task->tk_action = call_bind;
                        goto out_retry;
                case RPC_AUTH_TOOWEAK:
+                       rcu_read_lock();
                        printk(KERN_NOTICE "RPC: server %s requires stronger "
-                              "authentication.\n", task->tk_client->cl_server);
+                              "authentication.\n",
+                              rcu_dereference(clnt->cl_xprt)->servername);
+                       rcu_read_unlock();
                        break;
                default:
                        dprintk("RPC: %5u %s: unknown auth error: %x\n",
@@ -1730,28 +2050,27 @@ rpc_verify_header(struct rpc_task *task)
        case RPC_SUCCESS:
                return p;
        case RPC_PROG_UNAVAIL:
-               dprintk("RPC: %5u %s: program %u is unsupported by server %s\n",
-                               task->tk_pid, __func__,
-                               (unsigned int)task->tk_client->cl_prog,
-                               task->tk_client->cl_server);
+               dprintk_rcu("RPC: %5u %s: program %u is unsupported "
+                               "by server %s\n", task->tk_pid, __func__,
+                               (unsigned int)clnt->cl_prog,
+                               rcu_dereference(clnt->cl_xprt)->servername);
                error = -EPFNOSUPPORT;
                goto out_err;
        case RPC_PROG_MISMATCH:
-               dprintk("RPC: %5u %s: program %u, version %u unsupported by "
-                               "server %s\n", task->tk_pid, __func__,
-                               (unsigned int)task->tk_client->cl_prog,
-                               (unsigned int)task->tk_client->cl_vers,
-                               task->tk_client->cl_server);
+               dprintk_rcu("RPC: %5u %s: program %u, version %u unsupported "
+                               "by server %s\n", task->tk_pid, __func__,
+                               (unsigned int)clnt->cl_prog,
+                               (unsigned int)clnt->cl_vers,
+                               rcu_dereference(clnt->cl_xprt)->servername);
                error = -EPROTONOSUPPORT;
                goto out_err;
        case RPC_PROC_UNAVAIL:
-               dprintk("RPC: %5u %s: proc %s unsupported by program %u, "
+               dprintk_rcu("RPC: %5u %s: proc %s unsupported by program %u, "
                                "version %u on server %s\n",
                                task->tk_pid, __func__,
                                rpc_proc_name(task),
-                               task->tk_client->cl_prog,
-                               task->tk_client->cl_vers,
-                               task->tk_client->cl_server);
+                               clnt->cl_prog, clnt->cl_vers,
+                               rcu_dereference(clnt->cl_xprt)->servername);
                error = -EOPNOTSUPP;
                goto out_err;
        case RPC_GARBAGE_ARGS:
@@ -1765,7 +2084,7 @@ rpc_verify_header(struct rpc_task *task)
        }
 
 out_garbage:
-       task->tk_client->cl_stats->rpcgarbage++;
+       clnt->cl_stats->rpcgarbage++;
        if (task->tk_garb_retry) {
                task->tk_garb_retry--;
                dprintk("RPC: %5u %s: retrying\n",
@@ -1851,14 +2170,15 @@ static void rpc_show_task(const struct rpc_clnt *clnt,
                task->tk_action, rpc_waitq);
 }
 
-void rpc_show_tasks(void)
+void rpc_show_tasks(struct net *net)
 {
        struct rpc_clnt *clnt;
        struct rpc_task *task;
        int header = 0;
+       struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
 
-       spin_lock(&rpc_client_lock);
-       list_for_each_entry(clnt, &all_clients, cl_clients) {
+       spin_lock(&sn->rpc_client_lock);
+       list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
                spin_lock(&clnt->cl_lock);
                list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
                        if (!header) {
@@ -1869,6 +2189,6 @@ void rpc_show_tasks(void)
                }
                spin_unlock(&clnt->cl_lock);
        }
-       spin_unlock(&rpc_client_lock);
+       spin_unlock(&sn->rpc_client_lock);
 }
 #endif