- Updated to 3.4-rc1.
[linux-flexiantxendom0-3.2.10.git] / net / sunrpc / sched.c
index 9f62f49..d23b8aa 100644 (file)
 #include <linux/slab.h>
 #include <linux/mempool.h>
 #include <linux/smp.h>
-#include <linux/smp_lock.h>
 #include <linux/spinlock.h>
 #include <linux/mutex.h>
+#include <linux/freezer.h>
 
 #include <linux/sunrpc/clnt.h>
 
+#include "sunrpc.h"
+
 #ifdef RPC_DEBUG
 #define RPCDBG_FACILITY                RPCDBG_SCHED
-#define RPC_TASK_MAGIC_ID      0xf00baa
 #endif
 
+#define CREATE_TRACE_POINTS
+#include <trace/events/sunrpc.h>
+
 /*
  * RPC slabs and memory pools
  */
@@ -97,14 +101,16 @@ __rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
 /*
  * Add new request to a priority queue.
  */
-static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct rpc_task *task)
+static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue,
+               struct rpc_task *task,
+               unsigned char queue_priority)
 {
        struct list_head *q;
        struct rpc_task *t;
 
        INIT_LIST_HEAD(&task->u.tk_wait.links);
-       q = &queue->tasks[task->tk_priority];
-       if (unlikely(task->tk_priority > queue->maxpriority))
+       q = &queue->tasks[queue_priority];
+       if (unlikely(queue_priority > queue->maxpriority))
                q = &queue->tasks[queue->maxpriority];
        list_for_each_entry(t, q, u.tk_wait.list) {
                if (t->tk_owner == task->tk_owner) {
@@ -123,12 +129,14 @@ static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue, struct r
  * improve overall performance.
  * Everyone else gets appended to the queue to ensure proper FIFO behavior.
  */
-static void __rpc_add_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
+static void __rpc_add_wait_queue(struct rpc_wait_queue *queue,
+               struct rpc_task *task,
+               unsigned char queue_priority)
 {
        BUG_ON (RPC_IS_QUEUED(task));
 
        if (RPC_IS_PRIORITY(queue))
-               __rpc_add_wait_queue_priority(queue, task);
+               __rpc_add_wait_queue_priority(queue, task, queue_priority);
        else if (RPC_IS_SWAPPER(task))
                list_add(&task->u.tk_wait.list, &queue->tasks[0]);
        else
@@ -200,15 +208,14 @@ static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const c
        queue->qlen = 0;
        setup_timer(&queue->timer_list.timer, __rpc_queue_timer_fn, (unsigned long)queue);
        INIT_LIST_HEAD(&queue->timer_list.list);
-#ifdef RPC_DEBUG
-       queue->name = qname;
-#endif
+       rpc_assign_waitqueue_name(queue, qname);
 }
 
 void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname)
 {
        __rpc_init_priority_wait_queue(queue, qname, RPC_NR_PRIORITY);
 }
+EXPORT_SYMBOL_GPL(rpc_init_priority_wait_queue);
 
 void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
 {
@@ -226,7 +233,7 @@ static int rpc_wait_bit_killable(void *word)
 {
        if (fatal_signal_pending(current))
                return -ERESTARTSYS;
-       schedule();
+       freezable_schedule();
        return 0;
 }
 
@@ -235,7 +242,6 @@ static void rpc_task_set_debuginfo(struct rpc_task *task)
 {
        static atomic_t rpc_pid;
 
-       task->tk_magic = RPC_TASK_MAGIC_ID;
        task->tk_pid = atomic_inc_return(&rpc_pid);
 }
 #else
@@ -246,38 +252,47 @@ static inline void rpc_task_set_debuginfo(struct rpc_task *task)
 
 static void rpc_set_active(struct rpc_task *task)
 {
-       struct rpc_clnt *clnt;
-       if (test_and_set_bit(RPC_TASK_ACTIVE, &task->tk_runstate) != 0)
-               return;
+       trace_rpc_task_begin(task->tk_client, task, NULL);
+
        rpc_task_set_debuginfo(task);
-       /* Add to global list of all tasks */
-       clnt = task->tk_client;
-       if (clnt != NULL) {
-               spin_lock(&clnt->cl_lock);
-               list_add_tail(&task->tk_task, &clnt->cl_tasks);
-               spin_unlock(&clnt->cl_lock);
-       }
+       set_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
 }
 
 /*
  * Mark an RPC call as having completed by clearing the 'active' bit
+ * and then waking up all tasks that were sleeping.
  */
-static void rpc_mark_complete_task(struct rpc_task *task)
+static int rpc_complete_task(struct rpc_task *task)
 {
-       smp_mb__before_clear_bit();
+       void *m = &task->tk_runstate;
+       wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE);
+       struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE);
+       unsigned long flags;
+       int ret;
+
+       trace_rpc_task_complete(task->tk_client, task, NULL);
+
+       spin_lock_irqsave(&wq->lock, flags);
        clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
-       smp_mb__after_clear_bit();
-       wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
+       ret = atomic_dec_and_test(&task->tk_count);
+       if (waitqueue_active(wq))
+               __wake_up_locked_key(wq, TASK_NORMAL, &k);
+       spin_unlock_irqrestore(&wq->lock, flags);
+       return ret;
 }
 
 /*
  * Allow callers to wait for completion of an RPC call
+ *
+ * Note the use of out_of_line_wait_on_bit() rather than wait_on_bit()
+ * to enforce taking of the wq->lock and hence avoid races with
+ * rpc_complete_task().
  */
 int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
 {
        if (action == NULL)
                action = rpc_wait_bit_killable;
-       return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
+       return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
                        action, TASK_KILLABLE);
 }
 EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
@@ -293,21 +308,9 @@ static void rpc_make_runnable(struct rpc_task *task)
        rpc_clear_queued(task);
        if (rpc_test_and_set_running(task))
                return;
-       /* We might have raced */
-       if (RPC_IS_QUEUED(task)) {
-               rpc_clear_running(task);
-               return;
-       }
        if (RPC_IS_ASYNC(task)) {
-               int status;
-
                INIT_WORK(&task->u.tk_work, rpc_async_schedule);
-               status = queue_work(rpciod_workqueue, &task->u.tk_work);
-               if (status < 0) {
-                       printk(KERN_WARNING "RPC: failed to add task to queue: error: %d!\n", status);
-                       task->tk_status = status;
-                       return;
-               }
+               queue_work(rpciod_workqueue, &task->u.tk_work);
        } else
                wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
 }
@@ -318,18 +321,17 @@ static void rpc_make_runnable(struct rpc_task *task)
  * NB: An RPC task will only receive interrupt-driven events as long
  * as it's on a wait queue.
  */
-static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
-                       rpc_action action)
+static void __rpc_sleep_on_priority(struct rpc_wait_queue *q,
+               struct rpc_task *task,
+               rpc_action action,
+               unsigned char queue_priority)
 {
        dprintk("RPC: %5u sleep_on(queue \"%s\" time %lu)\n",
                        task->tk_pid, rpc_qname(q), jiffies);
 
-       if (!RPC_IS_ASYNC(task) && !RPC_IS_ACTIVATED(task)) {
-               printk(KERN_ERR "RPC: Inactive synchronous task put to sleep!\n");
-               return;
-       }
+       trace_rpc_task_sleep(task->tk_client, task, q);
 
-       __rpc_add_wait_queue(q, task);
+       __rpc_add_wait_queue(q, task, queue_priority);
 
        BUG_ON(task->tk_callback != NULL);
        task->tk_callback = action;
@@ -339,18 +341,32 @@ static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
 void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
                                rpc_action action)
 {
-       /* Mark the task as being activated if so needed */
-       rpc_set_active(task);
+       /* We shouldn't ever put an inactive task to sleep */
+       BUG_ON(!RPC_IS_ACTIVATED(task));
 
        /*
         * Protect the queue operations.
         */
        spin_lock_bh(&q->lock);
-       __rpc_sleep_on(q, task, action);
+       __rpc_sleep_on_priority(q, task, action, task->tk_priority);
        spin_unlock_bh(&q->lock);
 }
 EXPORT_SYMBOL_GPL(rpc_sleep_on);
 
+void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task,
+               rpc_action action, int priority)
+{
+       /* We shouldn't ever put an inactive task to sleep */
+       BUG_ON(!RPC_IS_ACTIVATED(task));
+
+       /*
+        * Protect the queue operations.
+        */
+       spin_lock_bh(&q->lock);
+       __rpc_sleep_on_priority(q, task, action, priority - RPC_PRIORITY_LOW);
+       spin_unlock_bh(&q->lock);
+}
+
 /**
  * __rpc_do_wake_up_task - wake up a single rpc_task
  * @queue: wait queue
@@ -363,15 +379,14 @@ static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task
        dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
                        task->tk_pid, jiffies);
 
-#ifdef RPC_DEBUG
-       BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
-#endif
        /* Has the task been executed yet? If not, we cannot wake it up! */
        if (!RPC_IS_ACTIVATED(task)) {
                printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
                return;
        }
 
+       trace_rpc_task_wakeup(task->tk_client, task, queue);
+
        __rpc_remove_wait_queue(queue, task);
 
        rpc_make_runnable(task);
@@ -389,28 +404,34 @@ static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct r
 }
 
 /*
- * Wake up a task on a specific queue
+ * Tests whether rpc queue is empty
  */
-void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
+int rpc_queue_empty(struct rpc_wait_queue *queue)
 {
+       int res;
+
        spin_lock_bh(&queue->lock);
-       rpc_wake_up_task_queue_locked(queue, task);
+       res = queue->qlen;
        spin_unlock_bh(&queue->lock);
+       return res == 0;
 }
-EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);
+EXPORT_SYMBOL_GPL(rpc_queue_empty);
 
 /*
- * Wake up the specified task
+ * Wake up a task on a specific queue
  */
-static void rpc_wake_up_task(struct rpc_task *task)
+void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
 {
-       rpc_wake_up_queued_task(task->tk_waitqueue, task);
+       spin_lock_bh(&queue->lock);
+       rpc_wake_up_task_queue_locked(queue, task);
+       spin_unlock_bh(&queue->lock);
 }
+EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);
 
 /*
  * Wake up the next task on a priority queue.
  */
-static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queue)
+static struct rpc_task *__rpc_find_next_queued_priority(struct rpc_wait_queue *queue)
 {
        struct list_head *q;
        struct rpc_task *task;
@@ -455,30 +476,54 @@ new_queue:
 new_owner:
        rpc_set_waitqueue_owner(queue, task->tk_owner);
 out:
-       rpc_wake_up_task_queue_locked(queue, task);
        return task;
 }
 
+static struct rpc_task *__rpc_find_next_queued(struct rpc_wait_queue *queue)
+{
+       if (RPC_IS_PRIORITY(queue))
+               return __rpc_find_next_queued_priority(queue);
+       if (!list_empty(&queue->tasks[0]))
+               return list_first_entry(&queue->tasks[0], struct rpc_task, u.tk_wait.list);
+       return NULL;
+}
+
 /*
- * Wake up the next task on the wait queue.
+ * Wake up the first task on the wait queue.
  */
-struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
+struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
+               bool (*func)(struct rpc_task *, void *), void *data)
 {
        struct rpc_task *task = NULL;
 
-       dprintk("RPC:       wake_up_next(%p \"%s\")\n",
+       dprintk("RPC:       wake_up_first(%p \"%s\")\n",
                        queue, rpc_qname(queue));
        spin_lock_bh(&queue->lock);
-       if (RPC_IS_PRIORITY(queue))
-               task = __rpc_wake_up_next_priority(queue);
-       else {
-               task_for_first(task, &queue->tasks[0])
+       task = __rpc_find_next_queued(queue);
+       if (task != NULL) {
+               if (func(task, data))
                        rpc_wake_up_task_queue_locked(queue, task);
+               else
+                       task = NULL;
        }
        spin_unlock_bh(&queue->lock);
 
        return task;
 }
+EXPORT_SYMBOL_GPL(rpc_wake_up_first);
+
+static bool rpc_wake_up_next_func(struct rpc_task *task, void *data)
+{
+       return true;
+}
+
+/*
+ * Wake up the next task on the wait queue.
+*/
+struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *queue)
+{
+       return rpc_wake_up_first(queue, rpc_wake_up_next_func, NULL);
+}
 EXPORT_SYMBOL_GPL(rpc_wake_up_next);
 
 /**
@@ -489,14 +534,18 @@ EXPORT_SYMBOL_GPL(rpc_wake_up_next);
  */
 void rpc_wake_up(struct rpc_wait_queue *queue)
 {
-       struct rpc_task *task, *next;
        struct list_head *head;
 
        spin_lock_bh(&queue->lock);
        head = &queue->tasks[queue->maxpriority];
        for (;;) {
-               list_for_each_entry_safe(task, next, head, u.tk_wait.list)
+               while (!list_empty(head)) {
+                       struct rpc_task *task;
+                       task = list_first_entry(head,
+                                       struct rpc_task,
+                                       u.tk_wait.list);
                        rpc_wake_up_task_queue_locked(queue, task);
+               }
                if (head == &queue->tasks[0])
                        break;
                head--;
@@ -514,13 +563,16 @@ EXPORT_SYMBOL_GPL(rpc_wake_up);
  */
 void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
 {
-       struct rpc_task *task, *next;
        struct list_head *head;
 
        spin_lock_bh(&queue->lock);
        head = &queue->tasks[queue->maxpriority];
        for (;;) {
-               list_for_each_entry_safe(task, next, head, u.tk_wait.list) {
+               while (!list_empty(head)) {
+                       struct rpc_task *task;
+                       task = list_first_entry(head,
+                                       struct rpc_task,
+                                       u.tk_wait.list);
                        task->tk_status = status;
                        rpc_wake_up_task_queue_locked(queue, task);
                }
@@ -532,6 +584,35 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
 }
 EXPORT_SYMBOL_GPL(rpc_wake_up_status);
 
+/**
+ * rpc_wake_up_softconn_status - wake up all SOFTCONN rpc_tasks and set their
+ * status value.
+ * @queue: rpc_wait_queue on which the tasks are sleeping
+ * @status: status value to set
+ *
+ * Grabs queue->lock
+ */
+void rpc_wake_up_softconn_status(struct rpc_wait_queue *queue, int status)
+{
+       struct rpc_task *task, *next;
+       struct list_head *head;
+
+       spin_lock_bh(&queue->lock);
+       head = &queue->tasks[queue->maxpriority];
+       for (;;) {
+               list_for_each_entry_safe(task, next, head, u.tk_wait.list)
+                       if (RPC_IS_SOFTCONN(task)) {
+                               task->tk_status = status;
+                               rpc_wake_up_task_queue_locked(queue, task);
+                       }
+               if (head == &queue->tasks[0])
+                       break;
+               head--;
+       }
+       spin_unlock_bh(&queue->lock);
+}
+EXPORT_SYMBOL_GPL(rpc_wake_up_softconn_status);
+
 static void __rpc_queue_timer_fn(unsigned long ptr)
 {
        struct rpc_wait_queue *queue = (struct rpc_wait_queue *)ptr;
@@ -574,11 +655,32 @@ EXPORT_SYMBOL_GPL(rpc_delay);
 /*
  * Helper to call task->tk_ops->rpc_call_prepare
  */
-static void rpc_prepare_task(struct rpc_task *task)
+void rpc_prepare_task(struct rpc_task *task)
 {
        task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
 }
 
+static void
+rpc_init_task_statistics(struct rpc_task *task)
+{
+       /* Initialize retry counters */
+       task->tk_garb_retry = 2;
+       task->tk_cred_retry = 2;
+       task->tk_rebind_retry = 2;
+
+       /* starting timestamp */
+       task->tk_start = ktime_get();
+}
+
+static void
+rpc_reset_task_statistics(struct rpc_task *task)
+{
+       task->tk_timeouts = 0;
+       task->tk_flags &= ~(RPC_CALL_MAJORSEEN|RPC_TASK_KILLED|RPC_TASK_SENT);
+
+       rpc_init_task_statistics(task);
+}
+
 /*
  * Helper that calls task->tk_ops->rpc_call_done if it exists
  */
@@ -591,10 +693,19 @@ void rpc_exit_task(struct rpc_task *task)
                        WARN_ON(RPC_ASSASSINATED(task));
                        /* Always release the RPC slot and buffer memory */
                        xprt_release(task);
+                       rpc_reset_task_statistics(task);
                }
        }
 }
-EXPORT_SYMBOL_GPL(rpc_exit_task);
+
+void rpc_exit(struct rpc_task *task, int status)
+{
+       task->tk_status = status;
+       task->tk_action = rpc_exit_task;
+       if (RPC_IS_QUEUED(task))
+               rpc_wake_up_queued_task(task->tk_waitqueue, task);
+}
+EXPORT_SYMBOL_GPL(rpc_exit);
 
 void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
 {
@@ -607,7 +718,9 @@ void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
  */
 static void __rpc_execute(struct rpc_task *task)
 {
-       int             status = 0;
+       struct rpc_wait_queue *queue;
+       int task_is_async = RPC_IS_ASYNC(task);
+       int status = 0;
 
        dprintk("RPC: %5u __rpc_execute flags=0x%x\n",
                        task->tk_pid, task->tk_flags);
@@ -615,47 +728,51 @@ static void __rpc_execute(struct rpc_task *task)
        BUG_ON(RPC_IS_QUEUED(task));
 
        for (;;) {
+               void (*do_action)(struct rpc_task *);
 
                /*
-                * Execute any pending callback.
+                * Execute any pending callback first.
                 */
-               if (task->tk_callback) {
-                       void (*save_callback)(struct rpc_task *);
-
+               do_action = task->tk_callback;
+               task->tk_callback = NULL;
+               if (do_action == NULL) {
                        /*
-                        * We set tk_callback to NULL before calling it,
-                        * in case it sets the tk_callback field itself:
+                        * Perform the next FSM step.
+                        * tk_action may be NULL if the task has been killed.
+                        * In particular, note that rpc_killall_tasks may
+                        * do this at any time, so beware when dereferencing.
                         */
-                       save_callback = task->tk_callback;
-                       task->tk_callback = NULL;
-                       save_callback(task);
-               }
-
-               /*
-                * Perform the next FSM step.
-                * tk_action may be NULL when the task has been killed
-                * by someone else.
-                */
-               if (!RPC_IS_QUEUED(task)) {
-                       if (task->tk_action == NULL)
+                       do_action = task->tk_action;
+                       if (do_action == NULL)
                                break;
-                       task->tk_action(task);
                }
+               trace_rpc_task_run_action(task->tk_client, task, task->tk_action);
+               do_action(task);
 
                /*
                 * Lockless check for whether task is sleeping or not.
                 */
                if (!RPC_IS_QUEUED(task))
                        continue;
-               rpc_clear_running(task);
-               if (RPC_IS_ASYNC(task)) {
-                       /* Careful! we may have raced... */
-                       if (RPC_IS_QUEUED(task))
-                               return;
-                       if (rpc_test_and_set_running(task))
-                               return;
+               /*
+                * The queue->lock protects against races with
+                * rpc_make_runnable().
+                *
+                * Note that once we clear RPC_TASK_RUNNING on an asynchronous
+                * rpc_task, rpc_make_runnable() can assign it to a
+                * different workqueue. We therefore cannot assume that the
+                * rpc_task pointer may still be dereferenced.
+                */
+               queue = task->tk_waitqueue;
+               spin_lock_bh(&queue->lock);
+               if (!RPC_IS_QUEUED(task)) {
+                       spin_unlock_bh(&queue->lock);
                        continue;
                }
+               rpc_clear_running(task);
+               spin_unlock_bh(&queue->lock);
+               if (task_is_async)
+                       return;
 
                /* sync task: sleep here */
                dprintk("RPC: %5u sync task going to sleep\n", task->tk_pid);
@@ -672,7 +789,6 @@ static void __rpc_execute(struct rpc_task *task)
                        dprintk("RPC: %5u got signal\n", task->tk_pid);
                        task->tk_flags |= RPC_TASK_KILLED;
                        rpc_exit(task, -ERESTARTSYS);
-                       rpc_wake_up_task(task);
                }
                rpc_set_running(task);
                dprintk("RPC: %5u sync task resuming\n", task->tk_pid);
@@ -696,8 +812,9 @@ static void __rpc_execute(struct rpc_task *task)
 void rpc_execute(struct rpc_task *task)
 {
        rpc_set_active(task);
-       rpc_set_running(task);
-       __rpc_execute(task);
+       rpc_make_runnable(task);
+       if (!RPC_IS_ASYNC(task))
+               __rpc_execute(task);
 }
 
 static void rpc_async_schedule(struct work_struct *work)
@@ -705,11 +822,6 @@ static void rpc_async_schedule(struct work_struct *work)
        __rpc_execute(container_of(work, struct rpc_task, u.tk_work));
 }
 
-struct rpc_buffer {
-       size_t  len;
-       char    data[];
-};
-
 /**
  * rpc_malloc - allocate an RPC buffer
  * @task: RPC task that will use this buffer
@@ -729,10 +841,7 @@ struct rpc_buffer {
 void *rpc_malloc(struct rpc_task *task, size_t size)
 {
        struct rpc_buffer *buf;
-       gfp_t gfp = GFP_NOWAIT;
-
-       if (RPC_IS_SWAPPER(task))
-               gfp |= __GFP_MEMALLOC;
+       gfp_t gfp = RPC_IS_SWAPPER(task) ? GFP_ATOMIC : GFP_NOWAIT;
 
        size += sizeof(struct rpc_buffer);
        if (size <= RPC_BUFFER_MAXSIZE)
@@ -788,40 +897,16 @@ static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *ta
        task->tk_calldata = task_setup_data->callback_data;
        INIT_LIST_HEAD(&task->tk_task);
 
-       /* Initialize retry counters */
-       task->tk_garb_retry = 2;
-       task->tk_cred_retry = 2;
-
        task->tk_priority = task_setup_data->priority - RPC_PRIORITY_LOW;
        task->tk_owner = current->tgid;
 
        /* Initialize workqueue for async tasks */
        task->tk_workqueue = task_setup_data->workqueue;
 
-       task->tk_client = task_setup_data->rpc_client;
-       if (task->tk_client != NULL) {
-               kref_get(&task->tk_client->cl_kref);
-               if (task->tk_client->cl_softrtry)
-                       task->tk_flags |= RPC_TASK_SOFT;
-               if (task->tk_client->cl_xprt->swapper)
-                       task->tk_flags |= RPC_TASK_SWAPPER;
-       }
-
        if (task->tk_ops->rpc_call_prepare != NULL)
                task->tk_action = rpc_prepare_task;
 
-       if (task_setup_data->rpc_message != NULL) {
-               task->tk_msg.rpc_proc = task_setup_data->rpc_message->rpc_proc;
-               task->tk_msg.rpc_argp = task_setup_data->rpc_message->rpc_argp;
-               task->tk_msg.rpc_resp = task_setup_data->rpc_message->rpc_resp;
-               /* Bind the user cred */
-               rpcauth_bindcred(task, task_setup_data->rpc_message->rpc_cred, task_setup_data->flags);
-               if (task->tk_action == NULL)
-                       rpc_call_start(task);
-       }
-
-       /* starting timestamp */
-       task->tk_start = jiffies;
+       rpc_init_task_statistics(task);
 
        dprintk("RPC:       new task initialized, procpid %u\n",
                                task_pid_nr(current));
@@ -830,7 +915,7 @@ static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *ta
 static struct rpc_task *
 rpc_alloc_task(void)
 {
-       return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOIO);
+       return (struct rpc_task *)mempool_alloc(rpc_task_mempool, GFP_NOFS);
 }
 
 /*
@@ -843,16 +928,17 @@ struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data)
 
        if (task == NULL) {
                task = rpc_alloc_task();
-               if (task == NULL)
-                       goto out;
+               if (task == NULL) {
+                       rpc_release_calldata(setup_data->callback_ops,
+                                       setup_data->callback_data);
+                       return ERR_PTR(-ENOMEM);
+               }
                flags = RPC_TASK_DYNAMIC;
        }
 
        rpc_init_task(task, setup_data);
-
        task->tk_flags |= flags;
        dprintk("RPC:       allocated task %p\n", task);
-out:
        return task;
 }
 
@@ -873,80 +959,70 @@ static void rpc_async_release(struct work_struct *work)
        rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
 }
 
-void rpc_put_task(struct rpc_task *task)
+static void rpc_release_resources_task(struct rpc_task *task)
 {
-       if (!atomic_dec_and_test(&task->tk_count))
-               return;
-       /* Release resources */
        if (task->tk_rqstp)
                xprt_release(task);
-       if (task->tk_msg.rpc_cred)
-               rpcauth_unbindcred(task);
-       if (task->tk_client) {
-               rpc_release_client(task->tk_client);
-               task->tk_client = NULL;
+       if (task->tk_msg.rpc_cred) {
+               put_rpccred(task->tk_msg.rpc_cred);
+               task->tk_msg.rpc_cred = NULL;
        }
-       if (task->tk_workqueue != NULL) {
+       rpc_task_release_client(task);
+}
+
+static void rpc_final_put_task(struct rpc_task *task,
+               struct workqueue_struct *q)
+{
+       if (q != NULL) {
                INIT_WORK(&task->u.tk_work, rpc_async_release);
-               queue_work(task->tk_workqueue, &task->u.tk_work);
+               queue_work(q, &task->u.tk_work);
        } else
                rpc_free_task(task);
 }
-EXPORT_SYMBOL_GPL(rpc_put_task);
 
-static void rpc_release_task(struct rpc_task *task)
+static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q)
 {
-#ifdef RPC_DEBUG
-       BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
-#endif
-       dprintk("RPC: %5u release task\n", task->tk_pid);
-
-       if (!list_empty(&task->tk_task)) {
-               struct rpc_clnt *clnt = task->tk_client;
-               /* Remove from client task list */
-               spin_lock(&clnt->cl_lock);
-               list_del(&task->tk_task);
-               spin_unlock(&clnt->cl_lock);
+       if (atomic_dec_and_test(&task->tk_count)) {
+               rpc_release_resources_task(task);
+               rpc_final_put_task(task, q);
        }
-       BUG_ON (RPC_IS_QUEUED(task));
+}
 
-#ifdef RPC_DEBUG
-       task->tk_magic = 0;
-#endif
-       /* Wake up anyone who is waiting for task completion */
-       rpc_mark_complete_task(task);
+void rpc_put_task(struct rpc_task *task)
+{
+       rpc_do_put_task(task, NULL);
+}
+EXPORT_SYMBOL_GPL(rpc_put_task);
 
-       rpc_put_task(task);
+void rpc_put_task_async(struct rpc_task *task)
+{
+       rpc_do_put_task(task, task->tk_workqueue);
 }
+EXPORT_SYMBOL_GPL(rpc_put_task_async);
 
-/*
- * Kill all tasks for the given client.
- * XXX: kill their descendants as well?
- */
-void rpc_killall_tasks(struct rpc_clnt *clnt)
+static void rpc_release_task(struct rpc_task *task)
 {
-       struct rpc_task *rovr;
+       dprintk("RPC: %5u release task\n", task->tk_pid);
 
+       BUG_ON (RPC_IS_QUEUED(task));
+
+       rpc_release_resources_task(task);
 
-       if (list_empty(&clnt->cl_tasks))
-               return;
-       dprintk("RPC:       killing all tasks for client %p\n", clnt);
        /*
-        * Spin lock all_tasks to prevent changes...
+        * Note: at this point we have been removed from rpc_clnt->cl_tasks,
+        * so it should be safe to use task->tk_count as a test for whether
+        * or not any other processes still hold references to our rpc_task.
         */
-       spin_lock(&clnt->cl_lock);
-       list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
-               if (! RPC_IS_ACTIVATED(rovr))
-                       continue;
-               if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
-                       rovr->tk_flags |= RPC_TASK_KILLED;
-                       rpc_exit(rovr, -EIO);
-                       rpc_wake_up_task(rovr);
-               }
+       if (atomic_read(&task->tk_count) != 1 + !RPC_IS_ASYNC(task)) {
+               /* Wake up anyone who may be waiting for task completion */
+               if (!rpc_complete_task(task))
+                       return;
+       } else {
+               if (!atomic_dec_and_test(&task->tk_count))
+                       return;
        }
-       spin_unlock(&clnt->cl_lock);
+       rpc_final_put_task(task, task->tk_workqueue);
 }
-EXPORT_SYMBOL_GPL(rpc_killall_tasks);
 
 int rpciod_up(void)
 {
@@ -969,7 +1045,7 @@ static int rpciod_start(void)
         * Create the rpciod thread and wait for it to start.
         */
        dprintk("RPC:       creating workqueue rpciod\n");
-       wq = create_workqueue("rpciod");
+       wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 0);
        rpciod_workqueue = wq;
        return rpciod_workqueue != NULL;
 }