Update to 3.4-final.
[linux-flexiantxendom0-3.2.10.git] / fs / aio.c
index 3006b5b..c51a272 100644 (file)
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -13,7 +13,7 @@
 #include <linux/errno.h>
 #include <linux/time.h>
 #include <linux/aio_abi.h>
-#include <linux/module.h>
+#include <linux/export.h>
 #include <linux/syscalls.h>
 #include <linux/backing-dev.h>
 #include <linux/uio.h>
 #include <linux/security.h>
 #include <linux/eventfd.h>
 #include <linux/blkdev.h>
-#include <linux/mempool.h>
-#include <linux/hash.h>
 #include <linux/compat.h>
 
 #include <asm/kmap_types.h>
 #include <asm/uaccess.h>
 
+#ifdef CONFIG_EPOLL
+#include <linux/poll.h>
+#include <linux/anon_inodes.h>
+#endif
+
 #if DEBUG > 1
 #define dprintk                printk
 #else
@@ -65,14 +68,6 @@ static DECLARE_WORK(fput_work, aio_fput_routine);
 static DEFINE_SPINLOCK(fput_lock);
 static LIST_HEAD(fput_head);
 
-#define AIO_BATCH_HASH_BITS    3 /* allocated on-stack, so don't go crazy */
-#define AIO_BATCH_HASH_SIZE    (1 << AIO_BATCH_HASH_BITS)
-struct aio_batch_entry {
-       struct hlist_node list;
-       struct address_space *mapping;
-};
-mempool_t *abe_pool;
-
 static void aio_kick_handler(struct work_struct *);
 static void aio_queue_work(struct kioctx *);
 
@@ -85,9 +80,8 @@ static int __init aio_setup(void)
        kiocb_cachep = KMEM_CACHE(kiocb, SLAB_HWCACHE_ALIGN|SLAB_PANIC);
        kioctx_cachep = KMEM_CACHE(kioctx,SLAB_HWCACHE_ALIGN|SLAB_PANIC);
 
-       aio_wq = create_workqueue("aio");
-       abe_pool = mempool_create_kmalloc_pool(1, sizeof(struct aio_batch_entry));
-       BUG_ON(!abe_pool);
+       aio_wq = alloc_workqueue("aio", 0, 1);  /* used to limit concurrency */
+       BUG_ON(!aio_wq);
 
        pr_debug("aio_setup: sizeof(struct page) = %d\n", (int)sizeof(struct page));
 
@@ -104,9 +98,8 @@ static void aio_free_ring(struct kioctx *ctx)
                put_page(info->ring_pages[i]);
 
        if (info->mmap_size) {
-               down_write(&ctx->mm->mmap_sem);
-               do_munmap(ctx->mm, info->mmap_base, info->mmap_size);
-               up_write(&ctx->mm->mmap_sem);
+               BUG_ON(ctx->mm != current->mm);
+               vm_munmap(info->mmap_base, info->mmap_size);
        }
 
        if (info->ring_pages && info->ring_pages != info->internal_pages)
@@ -171,7 +164,7 @@ static int aio_setup_ring(struct kioctx *ctx)
 
        info->nr = nr_events;           /* trusted copy */
 
-       ring = kmap_atomic(info->ring_pages[0], KM_USER0);
+       ring = kmap_atomic(info->ring_pages[0]);
        ring->nr = nr_events;   /* user copy */
        ring->id = ctx->user_id;
        ring->head = ring->tail = 0;
@@ -179,47 +172,38 @@ static int aio_setup_ring(struct kioctx *ctx)
        ring->compat_features = AIO_RING_COMPAT_FEATURES;
        ring->incompat_features = AIO_RING_INCOMPAT_FEATURES;
        ring->header_length = sizeof(struct aio_ring);
-       kunmap_atomic(ring, KM_USER0);
+       kunmap_atomic(ring);
 
        return 0;
 }
 
 
 /* aio_ring_event: returns a pointer to the event at the given index from
- * kmap_atomic(, km).  Release the pointer with put_aio_ring_event();
+ * kmap_atomic().  Release the pointer with put_aio_ring_event();
  */
 #define AIO_EVENTS_PER_PAGE    (PAGE_SIZE / sizeof(struct io_event))
 #define AIO_EVENTS_FIRST_PAGE  ((PAGE_SIZE - sizeof(struct aio_ring)) / sizeof(struct io_event))
 #define AIO_EVENTS_OFFSET      (AIO_EVENTS_PER_PAGE - AIO_EVENTS_FIRST_PAGE)
 
-#define aio_ring_event(info, nr, km) ({                                        \
+#define aio_ring_event(info, nr) ({                                    \
        unsigned pos = (nr) + AIO_EVENTS_OFFSET;                        \
        struct io_event *__event;                                       \
        __event = kmap_atomic(                                          \
-                       (info)->ring_pages[pos / AIO_EVENTS_PER_PAGE], km); \
+                       (info)->ring_pages[pos / AIO_EVENTS_PER_PAGE]); \
        __event += pos % AIO_EVENTS_PER_PAGE;                           \
        __event;                                                        \
 })
 
-#define put_aio_ring_event(event, km) do {     \
+#define put_aio_ring_event(event) do {         \
        struct io_event *__event = (event);     \
        (void)__event;                          \
-       kunmap_atomic((void *)((unsigned long)__event & PAGE_MASK), km); \
+       kunmap_atomic((void *)((unsigned long)__event & PAGE_MASK)); \
 } while(0)
 
 static void ctx_rcu_free(struct rcu_head *head)
 {
        struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
-       unsigned nr_events = ctx->max_reqs;
-
        kmem_cache_free(kioctx_cachep, ctx);
-
-       if (nr_events) {
-               spin_lock(&aio_nr_lock);
-               BUG_ON(aio_nr - nr_events > aio_nr);
-               aio_nr -= nr_events;
-               spin_unlock(&aio_nr_lock);
-       }
 }
 
 /* __put_ioctx
@@ -228,26 +212,34 @@ static void ctx_rcu_free(struct rcu_head *head)
  */
 static void __put_ioctx(struct kioctx *ctx)
 {
+       unsigned nr_events = ctx->max_reqs;
        BUG_ON(ctx->reqs_active);
 
-       cancel_delayed_work(&ctx->wq);
-       cancel_work_sync(&ctx->wq.work);
+       cancel_delayed_work_sync(&ctx->wq);
        aio_free_ring(ctx);
        mmdrop(ctx->mm);
        ctx->mm = NULL;
+       if (nr_events) {
+               spin_lock(&aio_nr_lock);
+               BUG_ON(aio_nr - nr_events > aio_nr);
+               aio_nr -= nr_events;
+               spin_unlock(&aio_nr_lock);
+       }
        pr_debug("__put_ioctx: freeing %p\n", ctx);
        call_rcu(&ctx->rcu_head, ctx_rcu_free);
 }
 
-#define get_ioctx(kioctx) do {                                         \
-       BUG_ON(atomic_read(&(kioctx)->users) <= 0);                     \
-       atomic_inc(&(kioctx)->users);                                   \
-} while (0)
-#define put_ioctx(kioctx) do {                                         \
-       BUG_ON(atomic_read(&(kioctx)->users) <= 0);                     \
-       if (unlikely(atomic_dec_and_test(&(kioctx)->users)))            \
-               __put_ioctx(kioctx);                                    \
-} while (0)
+static inline int try_get_ioctx(struct kioctx *kioctx)
+{
+       return atomic_inc_not_zero(&kioctx->users);
+}
+
+static inline void put_ioctx(struct kioctx *kioctx)
+{
+       BUG_ON(atomic_read(&kioctx->users) <= 0);
+       if (unlikely(atomic_dec_and_test(&kioctx->users)))
+               __put_ioctx(kioctx);
+}
 
 /* ioctx_alloc
  *     Allocates and initializes an ioctx.  Returns an ERR_PTR if it failed.
@@ -256,7 +248,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
 {
        struct mm_struct *mm;
        struct kioctx *ctx;
-       int did_sync = 0;
+       int err = -ENOMEM;
 
        /* Prevent overflows */
        if ((nr_events > (0x10000000U / sizeof(struct io_event))) ||
@@ -265,7 +257,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
                return ERR_PTR(-EINVAL);
        }
 
-       if ((unsigned long)nr_events > aio_max_nr)
+       if (!nr_events || (unsigned long)nr_events > aio_max_nr)
                return ERR_PTR(-EAGAIN);
 
        ctx = kmem_cache_zalloc(kioctx_cachep, GFP_KERNEL);
@@ -276,7 +268,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
        mm = ctx->mm = current->mm;
        atomic_inc(&mm->mm_count);
 
-       atomic_set(&ctx->users, 1);
+       atomic_set(&ctx->users, 2);
        spin_lock_init(&ctx->ctx_lock);
        spin_lock_init(&ctx->ring_info.ring_lock);
        init_waitqueue_head(&ctx->wait);
@@ -289,25 +281,14 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
                goto out_freectx;
 
        /* limit the number of system wide aios */
-       do {
-               spin_lock_bh(&aio_nr_lock);
-               if (aio_nr + nr_events > aio_max_nr ||
-                   aio_nr + nr_events < aio_nr)
-                       ctx->max_reqs = 0;
-               else
-                       aio_nr += ctx->max_reqs;
-               spin_unlock_bh(&aio_nr_lock);
-               if (ctx->max_reqs || did_sync)
-                       break;
-
-               /* wait for rcu callbacks to have completed before giving up */
-               synchronize_rcu();
-               did_sync = 1;
-               ctx->max_reqs = nr_events;
-       } while (1);
-
-       if (ctx->max_reqs == 0)
+       spin_lock(&aio_nr_lock);
+       if (aio_nr + nr_events > aio_max_nr ||
+           aio_nr + nr_events < aio_nr) {
+               spin_unlock(&aio_nr_lock);
                goto out_cleanup;
+       }
+       aio_nr += ctx->max_reqs;
+       spin_unlock(&aio_nr_lock);
 
        /* now link into global list. */
        spin_lock(&mm->ioctx_lock);
@@ -319,27 +300,27 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
        return ctx;
 
 out_cleanup:
-       __put_ioctx(ctx);
-       return ERR_PTR(-EAGAIN);
-
+       err = -EAGAIN;
+       aio_free_ring(ctx);
 out_freectx:
        mmdrop(mm);
        kmem_cache_free(kioctx_cachep, ctx);
-       ctx = ERR_PTR(-ENOMEM);
-
-       dprintk("aio: error allocating ioctx %p\n", ctx);
-       return ctx;
+       dprintk("aio: error allocating ioctx %d\n", err);
+       return ERR_PTR(err);
 }
 
-/* aio_cancel_all
+/* kill_ctx
  *     Cancels all outstanding aio requests on an aio context.  Used 
  *     when the processes owning a context have all exited to encourage 
  *     the rapid destruction of the kioctx.
  */
-static void aio_cancel_all(struct kioctx *ctx)
+static void kill_ctx(struct kioctx *ctx)
 {
        int (*cancel)(struct kiocb *, struct io_event *);
+       struct task_struct *tsk = current;
+       DECLARE_WAITQUEUE(wait, tsk);
        struct io_event res;
+
        spin_lock_irq(&ctx->ctx_lock);
        ctx->dead = 1;
        while (!list_empty(&ctx->active_reqs)) {
@@ -355,15 +336,7 @@ static void aio_cancel_all(struct kioctx *ctx)
                        spin_lock_irq(&ctx->ctx_lock);
                }
        }
-       spin_unlock_irq(&ctx->ctx_lock);
-}
-
-static void wait_for_all_aios(struct kioctx *ctx)
-{
-       struct task_struct *tsk = current;
-       DECLARE_WAITQUEUE(wait, tsk);
 
-       spin_lock_irq(&ctx->ctx_lock);
        if (!ctx->reqs_active)
                goto out;
 
@@ -413,19 +386,24 @@ void exit_aio(struct mm_struct *mm)
                ctx = hlist_entry(mm->ioctx_list.first, struct kioctx, list);
                hlist_del_rcu(&ctx->list);
 
-               aio_cancel_all(ctx);
-
-               wait_for_all_aios(ctx);
-               /*
-                * Ensure we don't leave the ctx on the aio_wq
-                */
-               cancel_work_sync(&ctx->wq.work);
+               kill_ctx(ctx);
 
                if (1 != atomic_read(&ctx->users))
                        printk(KERN_DEBUG
                                "exit_aio:ioctx still alive: %d %d %d\n",
                                atomic_read(&ctx->users), ctx->dead,
                                ctx->reqs_active);
+               /*
+                * We don't need to bother with munmap() here -
+                * exit_mmap(mm) is coming and it'll unmap everything.
+                * Since aio_free_ring() uses non-zero ->mmap_size
+                * as indicator that it needs to unmap the area,
+                * just set it to 0; aio_free_ring() is the only
+                * place that uses ->mmap_size, so it's safe.
+                * That way we get all munmap done to current->mm -
+                * all other callers have ctx->mm == current->mm.
+                */
+               ctx->ring_info.mmap_size = 0;
                put_ioctx(ctx);
        }
 }
@@ -443,8 +421,6 @@ void exit_aio(struct mm_struct *mm)
 static struct kiocb *__aio_get_req(struct kioctx *ctx)
 {
        struct kiocb *req = NULL;
-       struct aio_ring *ring;
-       int okay = 0;
 
        req = kmem_cache_alloc(kiocb_cachep, GFP_KERNEL);
        if (unlikely(!req))
@@ -462,39 +438,123 @@ static struct kiocb *__aio_get_req(struct kioctx *ctx)
        INIT_LIST_HEAD(&req->ki_run_list);
        req->ki_eventfd = NULL;
 
-       /* Check if the completion queue has enough free space to
-        * accept an event from this io.
-        */
+       return req;
+}
+
+/*
+ * struct kiocb's are allocated in batches to reduce the number of
+ * times the ctx lock is acquired and released.
+ */
+#define KIOCB_BATCH_SIZE       32L
+struct kiocb_batch {
+       struct list_head head;
+       long count; /* number of requests left to allocate */
+};
+
+static void kiocb_batch_init(struct kiocb_batch *batch, long total)
+{
+       INIT_LIST_HEAD(&batch->head);
+       batch->count = total;
+}
+
+static void kiocb_batch_free(struct kioctx *ctx, struct kiocb_batch *batch)
+{
+       struct kiocb *req, *n;
+
+       if (list_empty(&batch->head))
+               return;
+
        spin_lock_irq(&ctx->ctx_lock);
-       ring = kmap_atomic(ctx->ring_info.ring_pages[0], KM_USER0);
-       if (ctx->reqs_active < aio_ring_avail(&ctx->ring_info, ring)) {
-               list_add(&req->ki_list, &ctx->active_reqs);
-               ctx->reqs_active++;
-               okay = 1;
+       list_for_each_entry_safe(req, n, &batch->head, ki_batch) {
+               list_del(&req->ki_batch);
+               list_del(&req->ki_list);
+               kmem_cache_free(kiocb_cachep, req);
+               ctx->reqs_active--;
        }
-       kunmap_atomic(ring, KM_USER0);
+       if (unlikely(!ctx->reqs_active && ctx->dead))
+               wake_up_all(&ctx->wait);
        spin_unlock_irq(&ctx->ctx_lock);
+}
 
-       if (!okay) {
-               kmem_cache_free(kiocb_cachep, req);
-               req = NULL;
+/*
+ * Allocate a batch of kiocbs.  This avoids taking and dropping the
+ * context lock a lot during setup.
+ */
+static int kiocb_batch_refill(struct kioctx *ctx, struct kiocb_batch *batch)
+{
+       unsigned short allocated, to_alloc;
+       long avail;
+       bool called_fput = false;
+       struct kiocb *req, *n;
+       struct aio_ring *ring;
+
+       to_alloc = min(batch->count, KIOCB_BATCH_SIZE);
+       for (allocated = 0; allocated < to_alloc; allocated++) {
+               req = __aio_get_req(ctx);
+               if (!req)
+                       /* allocation failed, go with what we've got */
+                       break;
+               list_add(&req->ki_batch, &batch->head);
        }
 
-       return req;
+       if (allocated == 0)
+               goto out;
+
+retry:
+       spin_lock_irq(&ctx->ctx_lock);
+       ring = kmap_atomic(ctx->ring_info.ring_pages[0]);
+
+       avail = aio_ring_avail(&ctx->ring_info, ring) - ctx->reqs_active;
+       BUG_ON(avail < 0);
+       if (avail == 0 && !called_fput) {
+               /*
+                * Handle a potential starvation case.  It is possible that
+                * we hold the last reference on a struct file, causing us
+                * to delay the final fput to non-irq context.  In this case,
+                * ctx->reqs_active is artificially high.  Calling the fput
+                * routine here may free up a slot in the event completion
+                * ring, allowing this allocation to succeed.
+                */
+               kunmap_atomic(ring);
+               spin_unlock_irq(&ctx->ctx_lock);
+               aio_fput_routine(NULL);
+               called_fput = true;
+               goto retry;
+       }
+
+       if (avail < allocated) {
+               /* Trim back the number of requests. */
+               list_for_each_entry_safe(req, n, &batch->head, ki_batch) {
+                       list_del(&req->ki_batch);
+                       kmem_cache_free(kiocb_cachep, req);
+                       if (--allocated <= avail)
+                               break;
+               }
+       }
+
+       batch->count -= allocated;
+       list_for_each_entry(req, &batch->head, ki_batch) {
+               list_add(&req->ki_list, &ctx->active_reqs);
+               ctx->reqs_active++;
+       }
+
+       kunmap_atomic(ring);
+       spin_unlock_irq(&ctx->ctx_lock);
+
+out:
+       return allocated;
 }
 
-static inline struct kiocb *aio_get_req(struct kioctx *ctx)
+static inline struct kiocb *aio_get_req(struct kioctx *ctx,
+                                       struct kiocb_batch *batch)
 {
        struct kiocb *req;
-       /* Handle a potential starvation case -- should be exceedingly rare as 
-        * requests will be stuck on fput_head only if the aio_fput_routine is 
-        * delayed and the requests were the last user of the struct file.
-        */
-       req = __aio_get_req(ctx);
-       if (unlikely(NULL == req)) {
-               aio_fput_routine(NULL);
-               req = __aio_get_req(ctx);
-       }
+
+       if (list_empty(&batch->head))
+               if (kiocb_batch_refill(ctx, batch) == 0)
+                       return NULL;
+       req = list_first_entry(&batch->head, struct kiocb, ki_batch);
+       list_del(&req->ki_batch);
        return req;
 }
 
@@ -512,7 +572,7 @@ static inline void really_put_req(struct kioctx *ctx, struct kiocb *req)
        ctx->reqs_active--;
 
        if (unlikely(!ctx->reqs_active && ctx->dead))
-               wake_up(&ctx->wait);
+               wake_up_all(&ctx->wait);
 }
 
 static void aio_fput_routine(struct work_struct *data)
@@ -530,11 +590,16 @@ static void aio_fput_routine(struct work_struct *data)
                        fput(req->ki_filp);
 
                /* Link the iocb into the context's free list */
+               rcu_read_lock();
                spin_lock_irq(&ctx->ctx_lock);
                really_put_req(ctx, req);
+               /*
+                * at that point ctx might've been killed, but actual
+                * freeing is RCU'd
+                */
                spin_unlock_irq(&ctx->ctx_lock);
+               rcu_read_unlock();
 
-               put_ioctx(ctx);
                spin_lock_irq(&fput_lock);
        }
        spin_unlock_irq(&fput_lock);
@@ -565,11 +630,10 @@ static int __aio_put_req(struct kioctx *ctx, struct kiocb *req)
         * this function will be executed w/out any aio kthread wakeup.
         */
        if (unlikely(!fput_atomic(req->ki_filp))) {
-               get_ioctx(ctx);
                spin_lock(&fput_lock);
                list_add(&req->ki_list, &fput_head);
                spin_unlock(&fput_lock);
-               queue_work(aio_wq, &fput_work);
+               schedule_work(&fput_work);
        } else {
                req->ki_filp = NULL;
                really_put_req(ctx, req);
@@ -601,8 +665,13 @@ static struct kioctx *lookup_ioctx(unsigned long ctx_id)
        rcu_read_lock();
 
        hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list) {
-               if (ctx->user_id == ctx_id && !ctx->dead) {
-                       get_ioctx(ctx);
+               /*
+                * RCU protects us against accessing freed memory but
+                * we have to be careful not to get a reference when the
+                * reference count already dropped to 0 (ctx->dead test
+                * is unreliable because of races).
+                */
+               if (ctx->user_id == ctx_id && !ctx->dead && try_get_ioctx(ctx)){
                        ret = ctx;
                        break;
                }
@@ -712,8 +781,16 @@ static ssize_t aio_run_iocb(struct kiocb *iocb)
         */
        ret = retry(iocb);
 
-       if (ret != -EIOCBRETRY && ret != -EIOCBQUEUED)
+       if (ret != -EIOCBRETRY && ret != -EIOCBQUEUED) {
+               /*
+                * There's no easy way to restart the syscall since other AIO's
+                * may be already running. Just fail this IO with EINTR.
+                */
+               if (unlikely(ret == -ERESTARTSYS || ret == -ERESTARTNOINTR ||
+                            ret == -ERESTARTNOHAND || ret == -ERESTART_RESTARTBLOCK))
+                       ret = -EINTR;
                aio_complete(iocb, ret, 0);
+       }
 out:
        spin_lock_irq(&ctx->ctx_lock);
 
@@ -790,29 +867,12 @@ static void aio_queue_work(struct kioctx * ctx)
        queue_delayed_work(aio_wq, &ctx->wq, timeout);
 }
 
-
 /*
- * aio_run_iocbs:
- *     Process all pending retries queued on the ioctx
- *     run list.
- * Assumes it is operating within the aio issuer's mm
- * context.
- */
-static inline void aio_run_iocbs(struct kioctx *ctx)
-{
-       int requeue;
-
-       spin_lock_irq(&ctx->ctx_lock);
-
-       requeue = __aio_run_iocbs(ctx);
-       spin_unlock_irq(&ctx->ctx_lock);
-       if (requeue)
-               aio_queue_work(ctx);
-}
-
-/*
- * just like aio_run_iocbs, but keeps running them until
- * the list stays empty
+ * aio_run_all_iocbs:
+ *     Process all pending retries queued on the ioctx
+ *     run list, and keep running them until the list
+ *     stays empty.
+ * Assumes it is operating within the aio issuer's mm context.
  */
 static inline void aio_run_all_iocbs(struct kioctx *ctx)
 {
@@ -847,7 +907,7 @@ static void aio_kick_handler(struct work_struct *work)
        unuse_mm(mm);
        set_fs(oldfs);
        /*
-        * we're in a worker thread already, don't use queue_delayed_work,
+        * we're in a worker thread already; no point using non-zero delay
         */
        if (requeue)
                queue_delayed_work(aio_wq, &ctx->wq, 0);
@@ -946,10 +1006,10 @@ int aio_complete(struct kiocb *iocb, long res, long res2)
        if (kiocbIsCancelled(iocb))
                goto put_rq;
 
-       ring = kmap_atomic(info->ring_pages[0], KM_IRQ1);
+       ring = kmap_atomic(info->ring_pages[0]);
 
        tail = info->tail;
-       event = aio_ring_event(info, tail, KM_IRQ0);
+       event = aio_ring_event(info, tail);
        if (++tail >= info->nr)
                tail = 0;
 
@@ -970,8 +1030,8 @@ int aio_complete(struct kiocb *iocb, long res, long res2)
        info->tail = tail;
        ring->tail = tail;
 
-       put_aio_ring_event(event, KM_IRQ0);
-       kunmap_atomic(ring, KM_IRQ1);
+       put_aio_ring_event(event);
+       kunmap_atomic(ring);
 
        pr_debug("added to ring %p at [%lu]\n", iocb, tail);
 
@@ -998,6 +1058,11 @@ put_rq:
        if (waitqueue_active(&ctx->wait))
                wake_up(&ctx->wait);
 
+#ifdef CONFIG_EPOLL
+       if (ctx->file && waitqueue_active(&ctx->poll_wait))
+               wake_up(&ctx->poll_wait);
+#endif
+
        spin_unlock_irqrestore(&ctx->ctx_lock, flags);
        return ret;
 }
@@ -1006,6 +1071,8 @@ EXPORT_SYMBOL(aio_complete);
 /* aio_read_evt
  *     Pull an event off of the ioctx's event ring.  Returns the number of 
  *     events fetched (0 or 1 ;-)
+ *     If ent parameter is 0, just returns the number of events that would
+ *     be fetched.
  *     FIXME: make this use cmpxchg.
  *     TODO: make the ringbuffer user mmap()able (requires FIXME).
  */
@@ -1016,7 +1083,7 @@ static int aio_read_evt(struct kioctx *ioctx, struct io_event *ent)
        unsigned long head;
        int ret = 0;
 
-       ring = kmap_atomic(info->ring_pages[0], KM_USER0);
+       ring = kmap_atomic(info->ring_pages[0]);
        dprintk("in aio_read_evt h%lu t%lu m%lu\n",
                 (unsigned long)ring->head, (unsigned long)ring->tail,
                 (unsigned long)ring->nr);
@@ -1028,18 +1095,22 @@ static int aio_read_evt(struct kioctx *ioctx, struct io_event *ent)
 
        head = ring->head % info->nr;
        if (head != ring->tail) {
-               struct io_event *evp = aio_ring_event(info, head, KM_USER1);
-               *ent = *evp;
-               head = (head + 1) % info->nr;
-               smp_mb(); /* finish reading the event before updatng the head */
-               ring->head = head;
-               ret = 1;
-               put_aio_ring_event(evp, KM_USER1);
+               if (ent) { /* event requested */
+                       struct io_event *evp = aio_ring_event(info, head);
+                       *ent = *evp;
+                       head = (head + 1) % info->nr;
+                       /* finish reading the event before updatng the head */
+                       smp_mb();
+                       ring->head = head;
+                       ret = 1;
+                       put_aio_ring_event(evp);
+               } else /* only need to know availability */
+                       ret = 1;
        }
        spin_unlock(&info->ring_lock);
 
 out:
-       kunmap_atomic(ring, KM_USER0);
+       kunmap_atomic(ring);
        dprintk("leaving aio_read_evt: %d  h%lu t%lu\n", ret,
                 (unsigned long)ring->head, (unsigned long)ring->tail);
        return ret;
@@ -1217,18 +1288,89 @@ static void io_destroy(struct kioctx *ioctx)
        if (likely(!was_dead))
                put_ioctx(ioctx);       /* twice for the list */
 
-       aio_cancel_all(ioctx);
-       wait_for_all_aios(ioctx);
+       kill_ctx(ioctx);
+
+#ifdef CONFIG_EPOLL
+       /* forget the poll file, but it's up to the user to close it */
+       if (ioctx->file) {
+               fput(ioctx->file);
+               ioctx->file->private_data = 0;
+               ioctx->file = 0;
+       }
+#endif
 
        /*
         * Wake up any waiters.  The setting of ctx->dead must be seen
         * by other CPUs at this point.  Right now, we rely on the
         * locking done by the above calls to ensure this consistency.
         */
-       wake_up(&ioctx->wait);
-       put_ioctx(ioctx);       /* once for the lookup */
+       wake_up_all(&ioctx->wait);
 }
 
+#ifdef CONFIG_EPOLL
+
+static int aio_queue_fd_close(struct inode *inode, struct file *file)
+{
+       struct kioctx *ioctx = file->private_data;
+       if (ioctx) {
+               file->private_data = 0;
+               spin_lock_irq(&ioctx->ctx_lock);
+               ioctx->file = 0;
+               spin_unlock_irq(&ioctx->ctx_lock);
+               fput(file);
+       }
+       return 0;
+}
+
+static unsigned int aio_queue_fd_poll(struct file *file, poll_table *wait)
+{      unsigned int pollflags = 0;
+       struct kioctx *ioctx = file->private_data;
+
+       if (ioctx) {
+
+               spin_lock_irq(&ioctx->ctx_lock);
+               /* Insert inside our poll wait queue */
+               poll_wait(file, &ioctx->poll_wait, wait);
+
+               /* Check our condition */
+               if (aio_read_evt(ioctx, 0))
+                       pollflags = POLLIN | POLLRDNORM;
+               spin_unlock_irq(&ioctx->ctx_lock);
+       }
+
+       return pollflags;
+}
+
+static const struct file_operations aioq_fops = {
+       .release        = aio_queue_fd_close,
+       .poll           = aio_queue_fd_poll
+};
+
+/* make_aio_fd:
+ *  Create a file descriptor that can be used to poll the event queue.
+ *  Based on the excellent epoll code.
+ */
+
+static int make_aio_fd(struct kioctx *ioctx)
+{
+       int fd;
+       struct file *file;
+
+       fd = anon_inode_getfd("[aioq]", &aioq_fops, ioctx, 0);
+       if (fd < 0)
+               return fd;
+
+       /* associate the file with the IO context */
+       file = fget(fd);
+       if (!file)
+               return -EBADF;
+       file->private_data = ioctx;
+       ioctx->file = file;
+       init_waitqueue_head(&ioctx->poll_wait);
+       return fd;
+}
+#endif
+
 /* sys_io_setup:
  *     Create an aio_context capable of receiving at least nr_events.
  *     ctxp must not point to an aio_context that already exists, and
@@ -1241,18 +1383,30 @@ static void io_destroy(struct kioctx *ioctx)
  *     resources are available.  May fail with -EFAULT if an invalid
  *     pointer is passed for ctxp.  Will fail with -ENOSYS if not
  *     implemented.
+ *
+ *     To request a selectable fd, the user context has to be initialized
+ *     to 1, instead of 0, and the return value is the fd.
+ *     This keeps the system call compatible, since a non-zero value
+ *     was not allowed so far.
  */
 SYSCALL_DEFINE2(io_setup, unsigned, nr_events, aio_context_t __user *, ctxp)
 {
        struct kioctx *ioctx = NULL;
        unsigned long ctx;
        long ret;
+       int make_fd = 0;
 
        ret = get_user(ctx, ctxp);
        if (unlikely(ret))
                goto out;
 
        ret = -EINVAL;
+#ifdef CONFIG_EPOLL
+       if (ctx == 1) {
+               make_fd = 1;
+               ctx = 0;
+       }
+#endif
        if (unlikely(ctx || nr_events == 0)) {
                pr_debug("EINVAL: io_setup: ctx %lu nr_events %u\n",
                         ctx, nr_events);
@@ -1263,11 +1417,13 @@ SYSCALL_DEFINE2(io_setup, unsigned, nr_events, aio_context_t __user *, ctxp)
        ret = PTR_ERR(ioctx);
        if (!IS_ERR(ioctx)) {
                ret = put_user(ioctx->user_id, ctxp);
-               if (!ret)
-                       return 0;
-
-               get_ioctx(ioctx); /* io_destroy() expects us to hold a ref */
-               io_destroy(ioctx);
+#ifdef CONFIG_EPOLL
+               if (make_fd && !ret)
+                       ret = make_aio_fd(ioctx);
+#endif
+               if (ret < 0)
+                       io_destroy(ioctx);
+               put_ioctx(ioctx);
        }
 
 out:
@@ -1285,6 +1441,7 @@ SYSCALL_DEFINE1(io_destroy, aio_context_t, ctx)
        struct kioctx *ioctx = lookup_ioctx(ctx);
        if (likely(NULL != ioctx)) {
                io_destroy(ioctx);
+               put_ioctx(ioctx);
                return 0;
        }
        pr_debug("EINVAL: io_destroy: invalid context id\n");
@@ -1394,13 +1551,13 @@ static ssize_t aio_setup_vectored_rw(int type, struct kiocb *kiocb, bool compat)
                ret = compat_rw_copy_check_uvector(type,
                                (struct compat_iovec __user *)kiocb->ki_buf,
                                kiocb->ki_nbytes, 1, &kiocb->ki_inline_vec,
-                               &kiocb->ki_iovec);
+                               &kiocb->ki_iovec, 1);
        else
 #endif
                ret = rw_copy_check_uvector(type,
                                (struct iovec __user *)kiocb->ki_buf,
                                kiocb->ki_nbytes, 1, &kiocb->ki_inline_vec,
-                               &kiocb->ki_iovec);
+                               &kiocb->ki_iovec, 1);
        if (ret < 0)
                goto out;
 
@@ -1521,44 +1678,8 @@ static ssize_t aio_setup_iocb(struct kiocb *kiocb, bool compat)
        return 0;
 }
 
-static void aio_batch_add(struct address_space *mapping,
-                         struct hlist_head *batch_hash)
-{
-       struct aio_batch_entry *abe;
-       struct hlist_node *pos;
-       unsigned bucket;
-
-       bucket = hash_ptr(mapping, AIO_BATCH_HASH_BITS);
-       hlist_for_each_entry(abe, pos, &batch_hash[bucket], list) {
-               if (abe->mapping == mapping)
-                       return;
-       }
-
-       abe = mempool_alloc(abe_pool, GFP_KERNEL);
-       BUG_ON(!igrab(mapping->host));
-       abe->mapping = mapping;
-       hlist_add_head(&abe->list, &batch_hash[bucket]);
-       return;
-}
-
-static void aio_batch_free(struct hlist_head *batch_hash)
-{
-       struct aio_batch_entry *abe;
-       struct hlist_node *pos, *n;
-       int i;
-
-       for (i = 0; i < AIO_BATCH_HASH_SIZE; i++) {
-               hlist_for_each_entry_safe(abe, pos, n, &batch_hash[i], list) {
-                       blk_run_address_space(abe->mapping);
-                       iput(abe->mapping->host);
-                       hlist_del(&abe->list);
-                       mempool_free(abe, abe_pool);
-               }
-       }
-}
-
 static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
-                        struct iocb *iocb, struct hlist_head *batch_hash,
+                        struct iocb *iocb, struct kiocb_batch *batch,
                         bool compat)
 {
        struct kiocb *req;
@@ -1585,7 +1706,7 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
        if (unlikely(!file))
                return -EBADF;
 
-       req = aio_get_req(ctx);         /* returns with 2 references to req */
+       req = aio_get_req(ctx, batch);  /* returns with 2 references to req */
        if (unlikely(!req)) {
                fput(file);
                return -EAGAIN;
@@ -1626,6 +1747,23 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
                goto out_put_req;
 
        spin_lock_irq(&ctx->ctx_lock);
+       /*
+        * We could have raced with io_destroy() and are currently holding a
+        * reference to ctx which should be destroyed. We cannot submit IO
+        * since ctx gets freed as soon as io_submit() puts its reference.  The
+        * check here is reliable: io_destroy() sets ctx->dead before waiting
+        * for outstanding IO and the barrier between these two is realized by
+        * unlock of mm->ioctx_lock and lock of ctx->ctx_lock.  Analogously we
+        * increment ctx->reqs_active before checking for ctx->dead and the
+        * barrier is realized by unlock and lock of ctx->ctx_lock. Thus if we
+        * don't see ctx->dead set here, io_destroy() waits for our IO to
+        * finish.
+        */
+       if (ctx->dead) {
+               spin_unlock_irq(&ctx->ctx_lock);
+               ret = -EINVAL;
+               goto out_put_req;
+       }
        aio_run_iocb(req);
        if (!list_empty(&ctx->run_list)) {
                /* drain the run list */
@@ -1633,11 +1771,6 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
                        ;
        }
        spin_unlock_irq(&ctx->ctx_lock);
-       if (req->ki_opcode == IOCB_CMD_PREAD ||
-           req->ki_opcode == IOCB_CMD_PREADV ||
-           req->ki_opcode == IOCB_CMD_PWRITE ||
-           req->ki_opcode == IOCB_CMD_PWRITEV)
-               aio_batch_add(file->f_mapping, batch_hash);
 
        aio_put_req(req);       /* drop extra ref to req */
        return 0;
@@ -1653,12 +1786,16 @@ long do_io_submit(aio_context_t ctx_id, long nr,
 {
        struct kioctx *ctx;
        long ret = 0;
-       int i;
-       struct hlist_head batch_hash[AIO_BATCH_HASH_SIZE] = { { 0, }, };
+       int i = 0;
+       struct blk_plug plug;
+       struct kiocb_batch batch;
 
        if (unlikely(nr < 0))
                return -EINVAL;
 
+       if (unlikely(nr > LONG_MAX/sizeof(*iocbpp)))
+               nr = LONG_MAX/sizeof(*iocbpp);
+
        if (unlikely(!access_ok(VERIFY_READ, iocbpp, (nr*sizeof(*iocbpp)))))
                return -EFAULT;
 
@@ -1668,6 +1805,10 @@ long do_io_submit(aio_context_t ctx_id, long nr,
                return -EINVAL;
        }
 
+       kiocb_batch_init(&batch, nr);
+
+       blk_start_plug(&plug);
+
        /*
         * AKPM: should this return a partial result if some of the IOs were
         * successfully submitted?
@@ -1686,12 +1827,13 @@ long do_io_submit(aio_context_t ctx_id, long nr,
                        break;
                }
 
-               ret = io_submit_one(ctx, user_iocb, &tmp, batch_hash, compat);
+               ret = io_submit_one(ctx, user_iocb, &tmp, &batch, compat);
                if (ret)
                        break;
        }
-       aio_batch_free(batch_hash);
+       blk_finish_plug(&plug);
 
+       kiocb_batch_free(ctx, &batch);
        put_ioctx(ctx);
        return i ? i : ret;
 }
@@ -1816,7 +1958,7 @@ SYSCALL_DEFINE5(io_getevents, aio_context_t, ctx_id,
        long ret = -EINVAL;
 
        if (likely(ioctx)) {
-               if (likely(min_nr <= nr && min_nr >= 0 && nr >= 0))
+               if (likely(min_nr <= nr && min_nr >= 0))
                        ret = read_events(ioctx, min_nr, nr, events, timeout);
                put_ioctx(ioctx);
        }