fs: dcache scale subdirs
[linux-flexiantxendom0-natty.git] / fs / ceph / inode.c
index 913cafd..2c69444 100644 (file)
@@ -1,8 +1,7 @@
-#include "ceph_debug.h"
+#include <linux/ceph/ceph_debug.h>
 
 #include <linux/module.h>
 #include <linux/fs.h>
-#include <linux/smp_lock.h>
 #include <linux/slab.h>
 #include <linux/string.h>
 #include <linux/uaccess.h>
@@ -13,7 +12,8 @@
 #include <linux/pagevec.h>
 
 #include "super.h"
-#include "decode.h"
+#include "mds_client.h"
+#include <linux/ceph/decode.h>
 
 /*
  * Ceph inode operations
@@ -69,7 +69,7 @@ struct inode *ceph_get_snapdir(struct inode *parent)
 
        BUG_ON(!S_ISDIR(parent->i_mode));
        if (IS_ERR(inode))
-               return ERR_PTR(PTR_ERR(inode));
+               return inode;
        inode->i_mode = parent->i_mode;
        inode->i_uid = parent->i_uid;
        inode->i_gid = parent->i_gid;
@@ -384,7 +384,7 @@ void ceph_destroy_inode(struct inode *inode)
         */
        if (ci->i_snap_realm) {
                struct ceph_mds_client *mdsc =
-                       &ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
+                       ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
                struct ceph_snap_realm *realm = ci->i_snap_realm;
 
                dout(" dropping residual ref to snap realm %p\n", realm);
@@ -442,8 +442,9 @@ int ceph_fill_file_size(struct inode *inode, int issued,
                         * the file is either opened or mmaped
                         */
                        if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_RD|
-                                     CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER|
-                                     CEPH_CAP_FILE_EXCL)) ||
+                                      CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER|
+                                      CEPH_CAP_FILE_EXCL|
+                                      CEPH_CAP_FILE_LAZYIO)) ||
                            mapping_mapped(inode->i_mapping) ||
                            __ceph_caps_file_wanted(ci)) {
                                ci->i_truncate_pending++;
@@ -469,7 +470,9 @@ void ceph_fill_file_time(struct inode *inode, int issued,
 
        if (issued & (CEPH_CAP_FILE_EXCL|
                      CEPH_CAP_FILE_WR|
-                     CEPH_CAP_FILE_BUFFER)) {
+                     CEPH_CAP_FILE_BUFFER|
+                     CEPH_CAP_AUTH_EXCL|
+                     CEPH_CAP_XATTR_EXCL)) {
                if (timespec_compare(ctime, &inode->i_ctime) > 0) {
                        dout("ctime %ld.%09ld -> %ld.%09ld inc w/ cap\n",
                             inode->i_ctime.tv_sec, inode->i_ctime.tv_nsec,
@@ -509,7 +512,7 @@ void ceph_fill_file_time(struct inode *inode, int issued,
                        warn = 1;
                }
        } else {
-               /* we have no write caps; whatever the MDS says is true */
+               /* we have no write|excl caps; whatever the MDS says is true */
                if (ceph_seq_cmp(time_warp_seq, ci->i_time_warp_seq) >= 0) {
                        inode->i_ctime = *ctime;
                        inode->i_mtime = *mtime;
@@ -565,12 +568,17 @@ static int fill_inode(struct inode *inode,
 
        /*
         * provided version will be odd if inode value is projected,
-        * even if stable.  skip the update if we have a newer info
-        * (e.g., due to inode info racing form multiple MDSs), or if
-        * we are getting projected (unstable) inode info.
+        * even if stable.  skip the update if we have newer stable
+        * info (ours>=theirs, e.g. due to racing mds replies), unless
+        * we are getting projected (unstable) info (in which case the
+        * version is odd, and we want ours>theirs).
+        *   us   them
+        *   2    2     skip
+        *   3    2     skip
+        *   3    3     update
         */
        if (le64_to_cpu(info->version) > 0 &&
-           (ci->i_version & ~1) > le64_to_cpu(info->version))
+           (ci->i_version & ~1) >= le64_to_cpu(info->version))
                goto no_change;
 
        issued = __ceph_caps_issued(ci, &implemented);
@@ -604,7 +612,14 @@ static int fill_inode(struct inode *inode,
                            le32_to_cpu(info->time_warp_seq),
                            &ctime, &mtime, &atime);
 
-       ci->i_max_size = le64_to_cpu(info->max_size);
+       /* only update max_size on auth cap */
+       if ((info->cap.flags & CEPH_CAP_FLAG_AUTH) &&
+           ci->i_max_size != le64_to_cpu(info->max_size)) {
+               dout("max_size %lld -> %llu\n", ci->i_max_size,
+                    le64_to_cpu(info->max_size));
+               ci->i_max_size = le64_to_cpu(info->max_size);
+       }
+
        ci->i_layout = info->layout;
        inode->i_blkbits = fls(le32_to_cpu(info->layout.fl_stripe_unit)) - 1;
 
@@ -675,14 +690,16 @@ static int fill_inode(struct inode *inode,
                /* set dir completion flag? */
                if (ci->i_files == 0 && ci->i_subdirs == 0 &&
                    ceph_snap(inode) == CEPH_NOSNAP &&
-                   (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED)) {
+                   (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) &&
+                   (issued & CEPH_CAP_FILE_EXCL) == 0 &&
+                   (ci->i_ceph_flags & CEPH_I_COMPLETE) == 0) {
                        dout(" marking %p complete (empty)\n", inode);
                        ci->i_ceph_flags |= CEPH_I_COMPLETE;
                        ci->i_max_offset = 2;
                }
 
                /* it may be better to set st_size in getattr instead? */
-               if (ceph_test_opt(ceph_sb_to_client(inode->i_sb), RBYTES))
+               if (ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb), RBYTES))
                        inode->i_size = ci->i_rbytes;
                break;
        default:
@@ -803,6 +820,39 @@ out_unlock:
 }
 
 /*
+ * Set dentry's directory position based on the current dir's max, and
+ * order it in d_subdirs, so that dcache_readdir behaves.
+ */
+static void ceph_set_dentry_offset(struct dentry *dn)
+{
+       struct dentry *dir = dn->d_parent;
+       struct inode *inode = dn->d_parent->d_inode;
+       struct ceph_dentry_info *di;
+
+       BUG_ON(!inode);
+
+       di = ceph_dentry(dn);
+
+       spin_lock(&inode->i_lock);
+       if ((ceph_inode(inode)->i_ceph_flags & CEPH_I_COMPLETE) == 0) {
+               spin_unlock(&inode->i_lock);
+               return;
+       }
+       di->offset = ceph_inode(inode)->i_max_offset++;
+       spin_unlock(&inode->i_lock);
+
+       spin_lock(&dcache_lock);
+       spin_lock(&dir->d_lock);
+       spin_lock_nested(&dn->d_lock, DENTRY_D_LOCK_NESTED);
+       list_move(&dn->d_u.d_child, &dir->d_subdirs);
+       dout("set_dentry_offset %p %lld (%p %p)\n", dn, di->offset,
+            dn->d_u.d_child.prev, dn->d_u.d_child.next);
+       spin_unlock(&dn->d_lock);
+       spin_unlock(&dir->d_lock);
+       spin_unlock(&dcache_lock);
+}
+
+/*
  * splice a dentry to an inode.
  * caller must hold directory i_mutex for this to be safe.
  *
@@ -811,17 +861,19 @@ out_unlock:
  * the caller) if we fail.
  */
 static struct dentry *splice_dentry(struct dentry *dn, struct inode *in,
-                                   bool *prehash)
+                                   bool *prehash, bool set_offset)
 {
        struct dentry *realdn;
 
+       BUG_ON(dn->d_inode);
+
        /* dn must be unhashed */
        if (!d_unhashed(dn))
                d_drop(dn);
        realdn = d_materialise_unique(dn, in);
        if (IS_ERR(realdn)) {
-               pr_err("splice_dentry error %p inode %p ino %llx.%llx\n",
-                      dn, in, ceph_vinop(in));
+               pr_err("splice_dentry error %ld %p inode %p ino %llx.%llx\n",
+                      PTR_ERR(realdn), dn, in, ceph_vinop(in));
                if (prehash)
                        *prehash = false; /* don't rehash on error */
                dn = realdn; /* note realdn contains the error */
@@ -829,51 +881,25 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in,
        } else if (realdn) {
                dout("dn %p (%d) spliced with %p (%d) "
                     "inode %p ino %llx.%llx\n",
-                    dn, atomic_read(&dn->d_count),
-                    realdn, atomic_read(&realdn->d_count),
+                    dn, dn->d_count,
+                    realdn, realdn->d_count,
                     realdn->d_inode, ceph_vinop(realdn->d_inode));
                dput(dn);
                dn = realdn;
        } else {
                BUG_ON(!ceph_dentry(dn));
-
                dout("dn %p attached to %p ino %llx.%llx\n",
                     dn, dn->d_inode, ceph_vinop(dn->d_inode));
        }
        if ((!prehash || *prehash) && d_unhashed(dn))
                d_rehash(dn);
+       if (set_offset)
+               ceph_set_dentry_offset(dn);
 out:
        return dn;
 }
 
 /*
- * Set dentry's directory position based on the current dir's max, and
- * order it in d_subdirs, so that dcache_readdir behaves.
- */
-static void ceph_set_dentry_offset(struct dentry *dn)
-{
-       struct dentry *dir = dn->d_parent;
-       struct inode *inode = dn->d_parent->d_inode;
-       struct ceph_dentry_info *di;
-
-       BUG_ON(!inode);
-
-       di = ceph_dentry(dn);
-
-       spin_lock(&inode->i_lock);
-       di->offset = ceph_inode(inode)->i_max_offset++;
-       spin_unlock(&inode->i_lock);
-
-       spin_lock(&dcache_lock);
-       spin_lock(&dn->d_lock);
-       list_move_tail(&dir->d_subdirs, &dn->d_u.d_child);
-       dout("set_dentry_offset %p %lld (%p %p)\n", dn, di->offset,
-            dn->d_u.d_child.prev, dn->d_u.d_child.next);
-       spin_unlock(&dn->d_lock);
-       spin_unlock(&dcache_lock);
-}
-
-/*
  * Incorporate results into the local cache.  This is either just
  * one inode, or a directory, dentry, and possibly linked-to inode (e.g.,
  * after a lookup).
@@ -891,7 +917,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
        struct inode *in = NULL;
        struct ceph_mds_reply_inode *ininfo;
        struct ceph_vino vino;
-       struct ceph_client *client = ceph_sb_to_client(sb);
+       struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
        int i = 0;
        int err = 0;
 
@@ -934,21 +960,8 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
 
        if (!rinfo->head->is_target && !rinfo->head->is_dentry) {
                dout("fill_trace reply is empty!\n");
-               if (rinfo->head->result == 0 && req->r_locked_dir) {
-                       struct ceph_inode_info *ci =
-                               ceph_inode(req->r_locked_dir);
-                       dout(" clearing %p complete (empty trace)\n",
-                            req->r_locked_dir);
-                       spin_lock(&req->r_locked_dir->i_lock);
-                       ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
-                       ci->i_release_count++;
-                       spin_unlock(&req->r_locked_dir->i_lock);
-
-                       if (req->r_dentry)
-                               ceph_invalidate_dentry_lease(req->r_dentry);
-                       if (req->r_old_dentry)
-                               ceph_invalidate_dentry_lease(req->r_old_dentry);
-               }
+               if (rinfo->head->result == 0 && req->r_locked_dir)
+                       ceph_invalidate_dir_request(req);
                return 0;
        }
 
@@ -968,7 +981,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
         */
        if (rinfo->head->is_dentry && !req->r_aborted &&
            (rinfo->head->is_target || strncmp(req->r_dentry->d_name.name,
-                                              client->mount_args->snapdir_name,
+                                              fsc->mount_options->snapdir_name,
                                               req->r_dentry->d_name.len))) {
                /*
                 * lookup link rename   : null -> possibly existing inode
@@ -1025,6 +1038,9 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                        ceph_invalidate_dentry_lease(dn);
 
                        /* take overwritten dentry's readdir offset */
+                       dout("dn %p gets %p offset %lld (old offset %lld)\n",
+                            req->r_old_dentry, dn, ceph_dentry(dn)->offset,
+                            ceph_dentry(req->r_old_dentry)->offset);
                        ceph_dentry(req->r_old_dentry)->offset =
                                ceph_dentry(dn)->offset;
 
@@ -1054,7 +1070,8 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                ininfo = rinfo->targeti.in;
                vino.ino = le64_to_cpu(ininfo->ino);
                vino.snap = le64_to_cpu(ininfo->snapid);
-               if (!dn->d_inode) {
+               in = dn->d_inode;
+               if (!in) {
                        in = ceph_get_inode(sb, vino);
                        if (IS_ERR(in)) {
                                pr_err("fill_trace bad get_inode "
@@ -1063,13 +1080,12 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                                d_delete(dn);
                                goto done;
                        }
-                       dn = splice_dentry(dn, in, &have_lease);
+                       dn = splice_dentry(dn, in, &have_lease, true);
                        if (IS_ERR(dn)) {
                                err = PTR_ERR(dn);
                                goto done;
                        }
                        req->r_dentry = dn;  /* may have spliced */
-                       ceph_set_dentry_offset(dn);
                        igrab(in);
                } else if (ceph_ino(in) == vino.ino &&
                           ceph_snap(in) == vino.snap) {
@@ -1107,12 +1123,11 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                        goto done;
                }
                dout(" linking snapped dir %p to dn %p\n", in, dn);
-               dn = splice_dentry(dn, in, NULL);
+               dn = splice_dentry(dn, in, NULL, true);
                if (IS_ERR(dn)) {
                        err = PTR_ERR(dn);
                        goto done;
                }
-               ceph_set_dentry_offset(dn);
                req->r_dentry = dn;  /* may have spliced */
                igrab(in);
                rinfo->head->is_dentry = 1;  /* fool notrace handlers */
@@ -1204,8 +1219,10 @@ retry_lookup:
                                goto out;
                        }
                        err = ceph_init_dentry(dn);
-                       if (err < 0)
+                       if (err < 0) {
+                               dput(dn);
                                goto out;
+                       }
                } else if (dn->d_inode &&
                           (ceph_ino(dn->d_inode) != vino.ino ||
                            ceph_snap(dn->d_inode) != vino.snap)) {
@@ -1217,9 +1234,11 @@ retry_lookup:
                } else {
                        /* reorder parent's d_subdirs */
                        spin_lock(&dcache_lock);
-                       spin_lock(&dn->d_lock);
+                       spin_lock(&parent->d_lock);
+                       spin_lock_nested(&dn->d_lock, DENTRY_D_LOCK_NESTED);
                        list_move(&dn->d_u.d_child, &parent->d_subdirs);
                        spin_unlock(&dn->d_lock);
+                       spin_unlock(&parent->d_lock);
                        spin_unlock(&dcache_lock);
                }
 
@@ -1231,26 +1250,31 @@ retry_lookup:
                        in = dn->d_inode;
                } else {
                        in = ceph_get_inode(parent->d_sb, vino);
-                       if (in == NULL) {
+                       if (IS_ERR(in)) {
                                dout("new_inode badness\n");
                                d_delete(dn);
                                dput(dn);
-                               err = -ENOMEM;
+                               err = PTR_ERR(in);
                                goto out;
                        }
-                       dn = splice_dentry(dn, in, NULL);
+                       dn = splice_dentry(dn, in, NULL, false);
+                       if (IS_ERR(dn))
+                               dn = NULL;
                }
 
                if (fill_inode(in, &rinfo->dir_in[i], NULL, session,
                               req->r_request_started, -1,
                               &req->r_caps_reservation) < 0) {
                        pr_err("fill_inode badness on %p\n", in);
-                       dput(dn);
-                       continue;
+                       goto next_item;
                }
-               update_dentry_lease(dn, rinfo->dir_dlease[i],
-                                   req->r_session, req->r_request_started);
-               dput(dn);
+               if (dn)
+                       update_dentry_lease(dn, rinfo->dir_dlease[i],
+                                           req->r_session,
+                                           req->r_request_started);
+next_item:
+               if (dn)
+                       dput(dn);
        }
        req->r_did_prepopulate = true;
 
@@ -1380,11 +1404,8 @@ static void ceph_invalidate_work(struct work_struct *work)
        spin_lock(&inode->i_lock);
        dout("invalidate_pages %p gen %d revoking %d\n", inode,
             ci->i_rdcache_gen, ci->i_rdcache_revoking);
-       if (ci->i_rdcache_gen == 0 ||
-           ci->i_rdcache_revoking != ci->i_rdcache_gen) {
-               BUG_ON(ci->i_rdcache_revoking > ci->i_rdcache_gen);
+       if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
                /* nevermind! */
-               ci->i_rdcache_revoking = 0;
                spin_unlock(&inode->i_lock);
                goto out;
        }
@@ -1394,15 +1415,16 @@ static void ceph_invalidate_work(struct work_struct *work)
        ceph_invalidate_nondirty_pages(inode->i_mapping);
 
        spin_lock(&inode->i_lock);
-       if (orig_gen == ci->i_rdcache_gen) {
+       if (orig_gen == ci->i_rdcache_gen &&
+           orig_gen == ci->i_rdcache_revoking) {
                dout("invalidate_pages %p gen %d successful\n", inode,
                     ci->i_rdcache_gen);
-               ci->i_rdcache_gen = 0;
-               ci->i_rdcache_revoking = 0;
+               ci->i_rdcache_revoking--;
                check = 1;
        } else {
-               dout("invalidate_pages %p gen %d raced, gen now %d\n",
-                    inode, orig_gen, ci->i_rdcache_gen);
+               dout("invalidate_pages %p gen %d raced, now %d revoking %d\n",
+                    inode, orig_gen, ci->i_rdcache_gen,
+                    ci->i_rdcache_revoking);
        }
        spin_unlock(&inode->i_lock);
 
@@ -1499,7 +1521,7 @@ retry:
        if (wrbuffer_refs == 0)
                ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
        if (wake)
-               wake_up(&ci->i_cap_wq);
+               wake_up_all(&ci->i_cap_wq);
 }
 
 
@@ -1528,7 +1550,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
        struct inode *parent_inode = dentry->d_parent->d_inode;
        const unsigned int ia_valid = attr->ia_valid;
        struct ceph_mds_request *req;
-       struct ceph_mds_client *mdsc = &ceph_sb_to_client(dentry->d_sb)->mdsc;
+       struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
        int issued;
        int release = 0, dirtied = 0;
        int mask = 0;
@@ -1723,8 +1745,8 @@ out:
  */
 int ceph_do_getattr(struct inode *inode, int mask)
 {
-       struct ceph_client *client = ceph_sb_to_client(inode->i_sb);
-       struct ceph_mds_client *mdsc = &client->mdsc;
+       struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
+       struct ceph_mds_client *mdsc = fsc->mdsc;
        struct ceph_mds_request *req;
        int err;
 
@@ -1733,7 +1755,7 @@ int ceph_do_getattr(struct inode *inode, int mask)
                return 0;
        }
 
-       dout("do_getattr inode %p mask %s\n", inode, ceph_cap_string(mask));
+       dout("do_getattr inode %p mask %s mode 0%o\n", inode, ceph_cap_string(mask), inode->i_mode);
        if (ceph_caps_issued_mask(ceph_inode(inode), mask, 1))
                return 0;