Merge branch 'for-linus' of git://git.kernel.dk/linux-block
[linux-flexiantxendom0-3.2.10.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define SECTOR_SHIFT    9
51 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
57
58 #define RBD_MAX_MD_NAME_LEN     (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
59 #define RBD_MAX_POOL_NAME_LEN   64
60 #define RBD_MAX_SNAP_NAME_LEN   32
61 #define RBD_MAX_OPT_LEN         1024
62
63 #define RBD_SNAP_HEAD_NAME      "-"
64
65 /*
66  * An RBD device name will be "rbd#", where the "rbd" comes from
67  * RBD_DRV_NAME above, and # is a unique integer identifier.
68  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
69  * enough to hold all possible device names.
70  */
71 #define DEV_NAME_LEN            32
72 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
73
74 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
75
76 /*
77  * block device image metadata (in-memory version)
78  */
79 struct rbd_image_header {
80         u64 image_size;
81         char block_name[32];
82         __u8 obj_order;
83         __u8 crypt_type;
84         __u8 comp_type;
85         struct ceph_snap_context *snapc;
86         size_t snap_names_len;
87         u64 snap_seq;
88         u32 total_snaps;
89
90         char *snap_names;
91         u64 *snap_sizes;
92
93         u64 obj_version;
94 };
95
96 struct rbd_options {
97         int     notify_timeout;
98 };
99
100 /*
101  * an instance of the client.  multiple devices may share an rbd client.
102  */
103 struct rbd_client {
104         struct ceph_client      *client;
105         struct rbd_options      *rbd_opts;
106         struct kref             kref;
107         struct list_head        node;
108 };
109
110 /*
111  * a request completion status
112  */
113 struct rbd_req_status {
114         int done;
115         int rc;
116         u64 bytes;
117 };
118
119 /*
120  * a collection of requests
121  */
122 struct rbd_req_coll {
123         int                     total;
124         int                     num_done;
125         struct kref             kref;
126         struct rbd_req_status   status[0];
127 };
128
129 /*
130  * a single io request
131  */
132 struct rbd_request {
133         struct request          *rq;            /* blk layer request */
134         struct bio              *bio;           /* cloned bio */
135         struct page             **pages;        /* list of used pages */
136         u64                     len;
137         int                     coll_index;
138         struct rbd_req_coll     *coll;
139 };
140
141 struct rbd_snap {
142         struct  device          dev;
143         const char              *name;
144         size_t                  size;
145         struct list_head        node;
146         u64                     id;
147 };
148
149 /*
150  * a single device
151  */
152 struct rbd_device {
153         int                     id;             /* blkdev unique id */
154
155         int                     major;          /* blkdev assigned major */
156         struct gendisk          *disk;          /* blkdev's gendisk and rq */
157         struct request_queue    *q;
158
159         struct rbd_client       *rbd_client;
160
161         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
162
163         spinlock_t              lock;           /* queue lock */
164
165         struct rbd_image_header header;
166         char                    obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
167         int                     obj_len;
168         char                    obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
169         char                    pool_name[RBD_MAX_POOL_NAME_LEN];
170         int                     poolid;
171
172         struct ceph_osd_event   *watch_event;
173         struct ceph_osd_request *watch_request;
174
175         /* protects updating the header */
176         struct rw_semaphore     header_rwsem;
177         char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
178         u32 cur_snap;   /* index+1 of current snapshot within snap context
179                            0 - for the head */
180         int read_only;
181
182         struct list_head        node;
183
184         /* list of snapshots */
185         struct list_head        snaps;
186
187         /* sysfs related */
188         struct device           dev;
189 };
190
191 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
192
193 static LIST_HEAD(rbd_dev_list);    /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
195
196 static LIST_HEAD(rbd_client_list);              /* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
198
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202                             struct device_attribute *attr,
203                             const char *buf,
204                             size_t count);
205 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
206                                   struct rbd_snap *snap);
207
208 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
209                        size_t count);
210 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
211                           size_t count);
212
213 static struct bus_attribute rbd_bus_attrs[] = {
214         __ATTR(add, S_IWUSR, NULL, rbd_add),
215         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
216         __ATTR_NULL
217 };
218
219 static struct bus_type rbd_bus_type = {
220         .name           = "rbd",
221         .bus_attrs      = rbd_bus_attrs,
222 };
223
224 static void rbd_root_dev_release(struct device *dev)
225 {
226 }
227
228 static struct device rbd_root_dev = {
229         .init_name =    "rbd",
230         .release =      rbd_root_dev_release,
231 };
232
233
234 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
235 {
236         return get_device(&rbd_dev->dev);
237 }
238
239 static void rbd_put_dev(struct rbd_device *rbd_dev)
240 {
241         put_device(&rbd_dev->dev);
242 }
243
244 static int __rbd_update_snaps(struct rbd_device *rbd_dev);
245
246 static int rbd_open(struct block_device *bdev, fmode_t mode)
247 {
248         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
249
250         rbd_get_dev(rbd_dev);
251
252         set_device_ro(bdev, rbd_dev->read_only);
253
254         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
255                 return -EROFS;
256
257         return 0;
258 }
259
260 static int rbd_release(struct gendisk *disk, fmode_t mode)
261 {
262         struct rbd_device *rbd_dev = disk->private_data;
263
264         rbd_put_dev(rbd_dev);
265
266         return 0;
267 }
268
269 static const struct block_device_operations rbd_bd_ops = {
270         .owner                  = THIS_MODULE,
271         .open                   = rbd_open,
272         .release                = rbd_release,
273 };
274
275 /*
276  * Initialize an rbd client instance.
277  * We own *opt.
278  */
279 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
280                                             struct rbd_options *rbd_opts)
281 {
282         struct rbd_client *rbdc;
283         int ret = -ENOMEM;
284
285         dout("rbd_client_create\n");
286         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
287         if (!rbdc)
288                 goto out_opt;
289
290         kref_init(&rbdc->kref);
291         INIT_LIST_HEAD(&rbdc->node);
292
293         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
294
295         rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
296         if (IS_ERR(rbdc->client))
297                 goto out_mutex;
298         opt = NULL; /* Now rbdc->client is responsible for opt */
299
300         ret = ceph_open_session(rbdc->client);
301         if (ret < 0)
302                 goto out_err;
303
304         rbdc->rbd_opts = rbd_opts;
305
306         spin_lock(&rbd_client_list_lock);
307         list_add_tail(&rbdc->node, &rbd_client_list);
308         spin_unlock(&rbd_client_list_lock);
309
310         mutex_unlock(&ctl_mutex);
311
312         dout("rbd_client_create created %p\n", rbdc);
313         return rbdc;
314
315 out_err:
316         ceph_destroy_client(rbdc->client);
317 out_mutex:
318         mutex_unlock(&ctl_mutex);
319         kfree(rbdc);
320 out_opt:
321         if (opt)
322                 ceph_destroy_options(opt);
323         return ERR_PTR(ret);
324 }
325
326 /*
327  * Find a ceph client with specific addr and configuration.
328  */
329 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
330 {
331         struct rbd_client *client_node;
332
333         if (opt->flags & CEPH_OPT_NOSHARE)
334                 return NULL;
335
336         list_for_each_entry(client_node, &rbd_client_list, node)
337                 if (ceph_compare_options(opt, client_node->client) == 0)
338                         return client_node;
339         return NULL;
340 }
341
342 /*
343  * mount options
344  */
345 enum {
346         Opt_notify_timeout,
347         Opt_last_int,
348         /* int args above */
349         Opt_last_string,
350         /* string args above */
351 };
352
353 static match_table_t rbdopt_tokens = {
354         {Opt_notify_timeout, "notify_timeout=%d"},
355         /* int args above */
356         /* string args above */
357         {-1, NULL}
358 };
359
360 static int parse_rbd_opts_token(char *c, void *private)
361 {
362         struct rbd_options *rbdopt = private;
363         substring_t argstr[MAX_OPT_ARGS];
364         int token, intval, ret;
365
366         token = match_token(c, rbdopt_tokens, argstr);
367         if (token < 0)
368                 return -EINVAL;
369
370         if (token < Opt_last_int) {
371                 ret = match_int(&argstr[0], &intval);
372                 if (ret < 0) {
373                         pr_err("bad mount option arg (not int) "
374                                "at '%s'\n", c);
375                         return ret;
376                 }
377                 dout("got int token %d val %d\n", token, intval);
378         } else if (token > Opt_last_int && token < Opt_last_string) {
379                 dout("got string token %d val %s\n", token,
380                      argstr[0].from);
381         } else {
382                 dout("got token %d\n", token);
383         }
384
385         switch (token) {
386         case Opt_notify_timeout:
387                 rbdopt->notify_timeout = intval;
388                 break;
389         default:
390                 BUG_ON(token);
391         }
392         return 0;
393 }
394
395 /*
396  * Get a ceph client with specific addr and configuration, if one does
397  * not exist create it.
398  */
399 static struct rbd_client *rbd_get_client(const char *mon_addr,
400                                          size_t mon_addr_len,
401                                          char *options)
402 {
403         struct rbd_client *rbdc;
404         struct ceph_options *opt;
405         struct rbd_options *rbd_opts;
406
407         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
408         if (!rbd_opts)
409                 return ERR_PTR(-ENOMEM);
410
411         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
412
413         opt = ceph_parse_options(options, mon_addr,
414                                 mon_addr + mon_addr_len,
415                                 parse_rbd_opts_token, rbd_opts);
416         if (IS_ERR(opt)) {
417                 kfree(rbd_opts);
418                 return ERR_CAST(opt);
419         }
420
421         spin_lock(&rbd_client_list_lock);
422         rbdc = __rbd_client_find(opt);
423         if (rbdc) {
424                 /* using an existing client */
425                 kref_get(&rbdc->kref);
426                 spin_unlock(&rbd_client_list_lock);
427
428                 ceph_destroy_options(opt);
429                 kfree(rbd_opts);
430
431                 return rbdc;
432         }
433         spin_unlock(&rbd_client_list_lock);
434
435         rbdc = rbd_client_create(opt, rbd_opts);
436
437         if (IS_ERR(rbdc))
438                 kfree(rbd_opts);
439
440         return rbdc;
441 }
442
443 /*
444  * Destroy ceph client
445  *
446  * Caller must hold rbd_client_list_lock.
447  */
448 static void rbd_client_release(struct kref *kref)
449 {
450         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
451
452         dout("rbd_release_client %p\n", rbdc);
453         list_del(&rbdc->node);
454
455         ceph_destroy_client(rbdc->client);
456         kfree(rbdc->rbd_opts);
457         kfree(rbdc);
458 }
459
460 /*
461  * Drop reference to ceph client node. If it's not referenced anymore, release
462  * it.
463  */
464 static void rbd_put_client(struct rbd_device *rbd_dev)
465 {
466         spin_lock(&rbd_client_list_lock);
467         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
468         spin_unlock(&rbd_client_list_lock);
469         rbd_dev->rbd_client = NULL;
470 }
471
472 /*
473  * Destroy requests collection
474  */
475 static void rbd_coll_release(struct kref *kref)
476 {
477         struct rbd_req_coll *coll =
478                 container_of(kref, struct rbd_req_coll, kref);
479
480         dout("rbd_coll_release %p\n", coll);
481         kfree(coll);
482 }
483
484 /*
485  * Create a new header structure, translate header format from the on-disk
486  * header.
487  */
488 static int rbd_header_from_disk(struct rbd_image_header *header,
489                                  struct rbd_image_header_ondisk *ondisk,
490                                  int allocated_snaps,
491                                  gfp_t gfp_flags)
492 {
493         int i;
494         u32 snap_count;
495
496         if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
497                 return -ENXIO;
498
499         snap_count = le32_to_cpu(ondisk->snap_count);
500         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
501                                 snap_count * sizeof (*ondisk),
502                                 gfp_flags);
503         if (!header->snapc)
504                 return -ENOMEM;
505
506         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
507         if (snap_count) {
508                 header->snap_names = kmalloc(header->snap_names_len,
509                                              GFP_KERNEL);
510                 if (!header->snap_names)
511                         goto err_snapc;
512                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
513                                              GFP_KERNEL);
514                 if (!header->snap_sizes)
515                         goto err_names;
516         } else {
517                 header->snap_names = NULL;
518                 header->snap_sizes = NULL;
519         }
520         memcpy(header->block_name, ondisk->block_name,
521                sizeof(ondisk->block_name));
522
523         header->image_size = le64_to_cpu(ondisk->image_size);
524         header->obj_order = ondisk->options.order;
525         header->crypt_type = ondisk->options.crypt_type;
526         header->comp_type = ondisk->options.comp_type;
527
528         atomic_set(&header->snapc->nref, 1);
529         header->snap_seq = le64_to_cpu(ondisk->snap_seq);
530         header->snapc->num_snaps = snap_count;
531         header->total_snaps = snap_count;
532
533         if (snap_count && allocated_snaps == snap_count) {
534                 for (i = 0; i < snap_count; i++) {
535                         header->snapc->snaps[i] =
536                                 le64_to_cpu(ondisk->snaps[i].id);
537                         header->snap_sizes[i] =
538                                 le64_to_cpu(ondisk->snaps[i].image_size);
539                 }
540
541                 /* copy snapshot names */
542                 memcpy(header->snap_names, &ondisk->snaps[i],
543                         header->snap_names_len);
544         }
545
546         return 0;
547
548 err_names:
549         kfree(header->snap_names);
550 err_snapc:
551         kfree(header->snapc);
552         return -ENOMEM;
553 }
554
555 static int snap_index(struct rbd_image_header *header, int snap_num)
556 {
557         return header->total_snaps - snap_num;
558 }
559
560 static u64 cur_snap_id(struct rbd_device *rbd_dev)
561 {
562         struct rbd_image_header *header = &rbd_dev->header;
563
564         if (!rbd_dev->cur_snap)
565                 return 0;
566
567         return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
568 }
569
570 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
571                         u64 *seq, u64 *size)
572 {
573         int i;
574         char *p = header->snap_names;
575
576         for (i = 0; i < header->total_snaps; i++) {
577                 if (!strcmp(snap_name, p)) {
578
579                         /* Found it.  Pass back its id and/or size */
580
581                         if (seq)
582                                 *seq = header->snapc->snaps[i];
583                         if (size)
584                                 *size = header->snap_sizes[i];
585                         return i;
586                 }
587                 p += strlen(p) + 1;     /* Skip ahead to the next name */
588         }
589         return -ENOENT;
590 }
591
592 static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
593 {
594         struct rbd_image_header *header = &dev->header;
595         struct ceph_snap_context *snapc = header->snapc;
596         int ret = -ENOENT;
597
598         BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
599
600         down_write(&dev->header_rwsem);
601
602         if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
603                     sizeof (RBD_SNAP_HEAD_NAME))) {
604                 if (header->total_snaps)
605                         snapc->seq = header->snap_seq;
606                 else
607                         snapc->seq = 0;
608                 dev->cur_snap = 0;
609                 dev->read_only = 0;
610                 if (size)
611                         *size = header->image_size;
612         } else {
613                 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
614                 if (ret < 0)
615                         goto done;
616
617                 dev->cur_snap = header->total_snaps - ret;
618                 dev->read_only = 1;
619         }
620
621         ret = 0;
622 done:
623         up_write(&dev->header_rwsem);
624         return ret;
625 }
626
627 static void rbd_header_free(struct rbd_image_header *header)
628 {
629         kfree(header->snapc);
630         kfree(header->snap_names);
631         kfree(header->snap_sizes);
632 }
633
634 /*
635  * get the actual striped segment name, offset and length
636  */
637 static u64 rbd_get_segment(struct rbd_image_header *header,
638                            const char *block_name,
639                            u64 ofs, u64 len,
640                            char *seg_name, u64 *segofs)
641 {
642         u64 seg = ofs >> header->obj_order;
643
644         if (seg_name)
645                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
646                          "%s.%012llx", block_name, seg);
647
648         ofs = ofs & ((1 << header->obj_order) - 1);
649         len = min_t(u64, len, (1 << header->obj_order) - ofs);
650
651         if (segofs)
652                 *segofs = ofs;
653
654         return len;
655 }
656
657 static int rbd_get_num_segments(struct rbd_image_header *header,
658                                 u64 ofs, u64 len)
659 {
660         u64 start_seg = ofs >> header->obj_order;
661         u64 end_seg = (ofs + len - 1) >> header->obj_order;
662         return end_seg - start_seg + 1;
663 }
664
665 /*
666  * returns the size of an object in the image
667  */
668 static u64 rbd_obj_bytes(struct rbd_image_header *header)
669 {
670         return 1 << header->obj_order;
671 }
672
673 /*
674  * bio helpers
675  */
676
677 static void bio_chain_put(struct bio *chain)
678 {
679         struct bio *tmp;
680
681         while (chain) {
682                 tmp = chain;
683                 chain = chain->bi_next;
684                 bio_put(tmp);
685         }
686 }
687
688 /*
689  * zeros a bio chain, starting at specific offset
690  */
691 static void zero_bio_chain(struct bio *chain, int start_ofs)
692 {
693         struct bio_vec *bv;
694         unsigned long flags;
695         void *buf;
696         int i;
697         int pos = 0;
698
699         while (chain) {
700                 bio_for_each_segment(bv, chain, i) {
701                         if (pos + bv->bv_len > start_ofs) {
702                                 int remainder = max(start_ofs - pos, 0);
703                                 buf = bvec_kmap_irq(bv, &flags);
704                                 memset(buf + remainder, 0,
705                                        bv->bv_len - remainder);
706                                 bvec_kunmap_irq(buf, &flags);
707                         }
708                         pos += bv->bv_len;
709                 }
710
711                 chain = chain->bi_next;
712         }
713 }
714
715 /*
716  * bio_chain_clone - clone a chain of bios up to a certain length.
717  * might return a bio_pair that will need to be released.
718  */
719 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
720                                    struct bio_pair **bp,
721                                    int len, gfp_t gfpmask)
722 {
723         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
724         int total = 0;
725
726         if (*bp) {
727                 bio_pair_release(*bp);
728                 *bp = NULL;
729         }
730
731         while (old_chain && (total < len)) {
732                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
733                 if (!tmp)
734                         goto err_out;
735
736                 if (total + old_chain->bi_size > len) {
737                         struct bio_pair *bp;
738
739                         /*
740                          * this split can only happen with a single paged bio,
741                          * split_bio will BUG_ON if this is not the case
742                          */
743                         dout("bio_chain_clone split! total=%d remaining=%d"
744                              "bi_size=%d\n",
745                              (int)total, (int)len-total,
746                              (int)old_chain->bi_size);
747
748                         /* split the bio. We'll release it either in the next
749                            call, or it will have to be released outside */
750                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
751                         if (!bp)
752                                 goto err_out;
753
754                         __bio_clone(tmp, &bp->bio1);
755
756                         *next = &bp->bio2;
757                 } else {
758                         __bio_clone(tmp, old_chain);
759                         *next = old_chain->bi_next;
760                 }
761
762                 tmp->bi_bdev = NULL;
763                 gfpmask &= ~__GFP_WAIT;
764                 tmp->bi_next = NULL;
765
766                 if (!new_chain) {
767                         new_chain = tail = tmp;
768                 } else {
769                         tail->bi_next = tmp;
770                         tail = tmp;
771                 }
772                 old_chain = old_chain->bi_next;
773
774                 total += tmp->bi_size;
775         }
776
777         BUG_ON(total < len);
778
779         if (tail)
780                 tail->bi_next = NULL;
781
782         *old = old_chain;
783
784         return new_chain;
785
786 err_out:
787         dout("bio_chain_clone with err\n");
788         bio_chain_put(new_chain);
789         return NULL;
790 }
791
792 /*
793  * helpers for osd request op vectors.
794  */
795 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
796                             int num_ops,
797                             int opcode,
798                             u32 payload_len)
799 {
800         *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
801                        GFP_NOIO);
802         if (!*ops)
803                 return -ENOMEM;
804         (*ops)[0].op = opcode;
805         /*
806          * op extent offset and length will be set later on
807          * in calc_raw_layout()
808          */
809         (*ops)[0].payload_len = payload_len;
810         return 0;
811 }
812
813 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
814 {
815         kfree(ops);
816 }
817
818 static void rbd_coll_end_req_index(struct request *rq,
819                                    struct rbd_req_coll *coll,
820                                    int index,
821                                    int ret, u64 len)
822 {
823         struct request_queue *q;
824         int min, max, i;
825
826         dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
827              coll, index, ret, len);
828
829         if (!rq)
830                 return;
831
832         if (!coll) {
833                 blk_end_request(rq, ret, len);
834                 return;
835         }
836
837         q = rq->q;
838
839         spin_lock_irq(q->queue_lock);
840         coll->status[index].done = 1;
841         coll->status[index].rc = ret;
842         coll->status[index].bytes = len;
843         max = min = coll->num_done;
844         while (max < coll->total && coll->status[max].done)
845                 max++;
846
847         for (i = min; i<max; i++) {
848                 __blk_end_request(rq, coll->status[i].rc,
849                                   coll->status[i].bytes);
850                 coll->num_done++;
851                 kref_put(&coll->kref, rbd_coll_release);
852         }
853         spin_unlock_irq(q->queue_lock);
854 }
855
856 static void rbd_coll_end_req(struct rbd_request *req,
857                              int ret, u64 len)
858 {
859         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
860 }
861
862 /*
863  * Send ceph osd request
864  */
865 static int rbd_do_request(struct request *rq,
866                           struct rbd_device *dev,
867                           struct ceph_snap_context *snapc,
868                           u64 snapid,
869                           const char *obj, u64 ofs, u64 len,
870                           struct bio *bio,
871                           struct page **pages,
872                           int num_pages,
873                           int flags,
874                           struct ceph_osd_req_op *ops,
875                           int num_reply,
876                           struct rbd_req_coll *coll,
877                           int coll_index,
878                           void (*rbd_cb)(struct ceph_osd_request *req,
879                                          struct ceph_msg *msg),
880                           struct ceph_osd_request **linger_req,
881                           u64 *ver)
882 {
883         struct ceph_osd_request *req;
884         struct ceph_file_layout *layout;
885         int ret;
886         u64 bno;
887         struct timespec mtime = CURRENT_TIME;
888         struct rbd_request *req_data;
889         struct ceph_osd_request_head *reqhead;
890         struct ceph_osd_client *osdc;
891
892         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
893         if (!req_data) {
894                 if (coll)
895                         rbd_coll_end_req_index(rq, coll, coll_index,
896                                                -ENOMEM, len);
897                 return -ENOMEM;
898         }
899
900         if (coll) {
901                 req_data->coll = coll;
902                 req_data->coll_index = coll_index;
903         }
904
905         dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
906
907         down_read(&dev->header_rwsem);
908
909         osdc = &dev->rbd_client->client->osdc;
910         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
911                                         false, GFP_NOIO, pages, bio);
912         if (!req) {
913                 up_read(&dev->header_rwsem);
914                 ret = -ENOMEM;
915                 goto done_pages;
916         }
917
918         req->r_callback = rbd_cb;
919
920         req_data->rq = rq;
921         req_data->bio = bio;
922         req_data->pages = pages;
923         req_data->len = len;
924
925         req->r_priv = req_data;
926
927         reqhead = req->r_request->front.iov_base;
928         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
929
930         strncpy(req->r_oid, obj, sizeof(req->r_oid));
931         req->r_oid_len = strlen(req->r_oid);
932
933         layout = &req->r_file_layout;
934         memset(layout, 0, sizeof(*layout));
935         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
936         layout->fl_stripe_count = cpu_to_le32(1);
937         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
938         layout->fl_pg_preferred = cpu_to_le32(-1);
939         layout->fl_pg_pool = cpu_to_le32(dev->poolid);
940         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
941                                 req, ops);
942
943         ceph_osdc_build_request(req, ofs, &len,
944                                 ops,
945                                 snapc,
946                                 &mtime,
947                                 req->r_oid, req->r_oid_len);
948         up_read(&dev->header_rwsem);
949
950         if (linger_req) {
951                 ceph_osdc_set_request_linger(osdc, req);
952                 *linger_req = req;
953         }
954
955         ret = ceph_osdc_start_request(osdc, req, false);
956         if (ret < 0)
957                 goto done_err;
958
959         if (!rbd_cb) {
960                 ret = ceph_osdc_wait_request(osdc, req);
961                 if (ver)
962                         *ver = le64_to_cpu(req->r_reassert_version.version);
963                 dout("reassert_ver=%lld\n",
964                      le64_to_cpu(req->r_reassert_version.version));
965                 ceph_osdc_put_request(req);
966         }
967         return ret;
968
969 done_err:
970         bio_chain_put(req_data->bio);
971         ceph_osdc_put_request(req);
972 done_pages:
973         rbd_coll_end_req(req_data, ret, len);
974         kfree(req_data);
975         return ret;
976 }
977
978 /*
979  * Ceph osd op callback
980  */
981 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
982 {
983         struct rbd_request *req_data = req->r_priv;
984         struct ceph_osd_reply_head *replyhead;
985         struct ceph_osd_op *op;
986         __s32 rc;
987         u64 bytes;
988         int read_op;
989
990         /* parse reply */
991         replyhead = msg->front.iov_base;
992         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
993         op = (void *)(replyhead + 1);
994         rc = le32_to_cpu(replyhead->result);
995         bytes = le64_to_cpu(op->extent.length);
996         read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
997
998         dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
999
1000         if (rc == -ENOENT && read_op) {
1001                 zero_bio_chain(req_data->bio, 0);
1002                 rc = 0;
1003         } else if (rc == 0 && read_op && bytes < req_data->len) {
1004                 zero_bio_chain(req_data->bio, bytes);
1005                 bytes = req_data->len;
1006         }
1007
1008         rbd_coll_end_req(req_data, rc, bytes);
1009
1010         if (req_data->bio)
1011                 bio_chain_put(req_data->bio);
1012
1013         ceph_osdc_put_request(req);
1014         kfree(req_data);
1015 }
1016
1017 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1018 {
1019         ceph_osdc_put_request(req);
1020 }
1021
1022 /*
1023  * Do a synchronous ceph osd operation
1024  */
1025 static int rbd_req_sync_op(struct rbd_device *dev,
1026                            struct ceph_snap_context *snapc,
1027                            u64 snapid,
1028                            int opcode,
1029                            int flags,
1030                            struct ceph_osd_req_op *orig_ops,
1031                            int num_reply,
1032                            const char *obj,
1033                            u64 ofs, u64 len,
1034                            char *buf,
1035                            struct ceph_osd_request **linger_req,
1036                            u64 *ver)
1037 {
1038         int ret;
1039         struct page **pages;
1040         int num_pages;
1041         struct ceph_osd_req_op *ops = orig_ops;
1042         u32 payload_len;
1043
1044         num_pages = calc_pages_for(ofs , len);
1045         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1046         if (IS_ERR(pages))
1047                 return PTR_ERR(pages);
1048
1049         if (!orig_ops) {
1050                 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1051                 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1052                 if (ret < 0)
1053                         goto done;
1054
1055                 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1056                         ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1057                         if (ret < 0)
1058                                 goto done_ops;
1059                 }
1060         }
1061
1062         ret = rbd_do_request(NULL, dev, snapc, snapid,
1063                           obj, ofs, len, NULL,
1064                           pages, num_pages,
1065                           flags,
1066                           ops,
1067                           2,
1068                           NULL, 0,
1069                           NULL,
1070                           linger_req, ver);
1071         if (ret < 0)
1072                 goto done_ops;
1073
1074         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1075                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1076
1077 done_ops:
1078         if (!orig_ops)
1079                 rbd_destroy_ops(ops);
1080 done:
1081         ceph_release_page_vector(pages, num_pages);
1082         return ret;
1083 }
1084
1085 /*
1086  * Do an asynchronous ceph osd operation
1087  */
1088 static int rbd_do_op(struct request *rq,
1089                      struct rbd_device *rbd_dev ,
1090                      struct ceph_snap_context *snapc,
1091                      u64 snapid,
1092                      int opcode, int flags, int num_reply,
1093                      u64 ofs, u64 len,
1094                      struct bio *bio,
1095                      struct rbd_req_coll *coll,
1096                      int coll_index)
1097 {
1098         char *seg_name;
1099         u64 seg_ofs;
1100         u64 seg_len;
1101         int ret;
1102         struct ceph_osd_req_op *ops;
1103         u32 payload_len;
1104
1105         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1106         if (!seg_name)
1107                 return -ENOMEM;
1108
1109         seg_len = rbd_get_segment(&rbd_dev->header,
1110                                   rbd_dev->header.block_name,
1111                                   ofs, len,
1112                                   seg_name, &seg_ofs);
1113
1114         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1115
1116         ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1117         if (ret < 0)
1118                 goto done;
1119
1120         /* we've taken care of segment sizes earlier when we
1121            cloned the bios. We should never have a segment
1122            truncated at this point */
1123         BUG_ON(seg_len < len);
1124
1125         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1126                              seg_name, seg_ofs, seg_len,
1127                              bio,
1128                              NULL, 0,
1129                              flags,
1130                              ops,
1131                              num_reply,
1132                              coll, coll_index,
1133                              rbd_req_cb, 0, NULL);
1134
1135         rbd_destroy_ops(ops);
1136 done:
1137         kfree(seg_name);
1138         return ret;
1139 }
1140
1141 /*
1142  * Request async osd write
1143  */
1144 static int rbd_req_write(struct request *rq,
1145                          struct rbd_device *rbd_dev,
1146                          struct ceph_snap_context *snapc,
1147                          u64 ofs, u64 len,
1148                          struct bio *bio,
1149                          struct rbd_req_coll *coll,
1150                          int coll_index)
1151 {
1152         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1153                          CEPH_OSD_OP_WRITE,
1154                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1155                          2,
1156                          ofs, len, bio, coll, coll_index);
1157 }
1158
1159 /*
1160  * Request async osd read
1161  */
1162 static int rbd_req_read(struct request *rq,
1163                          struct rbd_device *rbd_dev,
1164                          u64 snapid,
1165                          u64 ofs, u64 len,
1166                          struct bio *bio,
1167                          struct rbd_req_coll *coll,
1168                          int coll_index)
1169 {
1170         return rbd_do_op(rq, rbd_dev, NULL,
1171                          (snapid ? snapid : CEPH_NOSNAP),
1172                          CEPH_OSD_OP_READ,
1173                          CEPH_OSD_FLAG_READ,
1174                          2,
1175                          ofs, len, bio, coll, coll_index);
1176 }
1177
1178 /*
1179  * Request sync osd read
1180  */
1181 static int rbd_req_sync_read(struct rbd_device *dev,
1182                           struct ceph_snap_context *snapc,
1183                           u64 snapid,
1184                           const char *obj,
1185                           u64 ofs, u64 len,
1186                           char *buf,
1187                           u64 *ver)
1188 {
1189         return rbd_req_sync_op(dev, NULL,
1190                                (snapid ? snapid : CEPH_NOSNAP),
1191                                CEPH_OSD_OP_READ,
1192                                CEPH_OSD_FLAG_READ,
1193                                NULL,
1194                                1, obj, ofs, len, buf, NULL, ver);
1195 }
1196
1197 /*
1198  * Request sync osd watch
1199  */
1200 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1201                                    u64 ver,
1202                                    u64 notify_id,
1203                                    const char *obj)
1204 {
1205         struct ceph_osd_req_op *ops;
1206         struct page **pages = NULL;
1207         int ret;
1208
1209         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1210         if (ret < 0)
1211                 return ret;
1212
1213         ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1214         ops[0].watch.cookie = notify_id;
1215         ops[0].watch.flag = 0;
1216
1217         ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1218                           obj, 0, 0, NULL,
1219                           pages, 0,
1220                           CEPH_OSD_FLAG_READ,
1221                           ops,
1222                           1,
1223                           NULL, 0,
1224                           rbd_simple_req_cb, 0, NULL);
1225
1226         rbd_destroy_ops(ops);
1227         return ret;
1228 }
1229
1230 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1231 {
1232         struct rbd_device *dev = (struct rbd_device *)data;
1233         int rc;
1234
1235         if (!dev)
1236                 return;
1237
1238         dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1239                 notify_id, (int)opcode);
1240         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1241         rc = __rbd_update_snaps(dev);
1242         mutex_unlock(&ctl_mutex);
1243         if (rc)
1244                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1245                            " update snaps: %d\n", dev->major, rc);
1246
1247         rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1248 }
1249
1250 /*
1251  * Request sync osd watch
1252  */
1253 static int rbd_req_sync_watch(struct rbd_device *dev,
1254                               const char *obj,
1255                               u64 ver)
1256 {
1257         struct ceph_osd_req_op *ops;
1258         struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1259
1260         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1261         if (ret < 0)
1262                 return ret;
1263
1264         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1265                                      (void *)dev, &dev->watch_event);
1266         if (ret < 0)
1267                 goto fail;
1268
1269         ops[0].watch.ver = cpu_to_le64(ver);
1270         ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1271         ops[0].watch.flag = 1;
1272
1273         ret = rbd_req_sync_op(dev, NULL,
1274                               CEPH_NOSNAP,
1275                               0,
1276                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1277                               ops,
1278                               1, obj, 0, 0, NULL,
1279                               &dev->watch_request, NULL);
1280
1281         if (ret < 0)
1282                 goto fail_event;
1283
1284         rbd_destroy_ops(ops);
1285         return 0;
1286
1287 fail_event:
1288         ceph_osdc_cancel_event(dev->watch_event);
1289         dev->watch_event = NULL;
1290 fail:
1291         rbd_destroy_ops(ops);
1292         return ret;
1293 }
1294
1295 /*
1296  * Request sync osd unwatch
1297  */
1298 static int rbd_req_sync_unwatch(struct rbd_device *dev,
1299                                 const char *obj)
1300 {
1301         struct ceph_osd_req_op *ops;
1302
1303         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1304         if (ret < 0)
1305                 return ret;
1306
1307         ops[0].watch.ver = 0;
1308         ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1309         ops[0].watch.flag = 0;
1310
1311         ret = rbd_req_sync_op(dev, NULL,
1312                               CEPH_NOSNAP,
1313                               0,
1314                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1315                               ops,
1316                               1, obj, 0, 0, NULL, NULL, NULL);
1317
1318         rbd_destroy_ops(ops);
1319         ceph_osdc_cancel_event(dev->watch_event);
1320         dev->watch_event = NULL;
1321         return ret;
1322 }
1323
1324 struct rbd_notify_info {
1325         struct rbd_device *dev;
1326 };
1327
1328 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1329 {
1330         struct rbd_device *dev = (struct rbd_device *)data;
1331         if (!dev)
1332                 return;
1333
1334         dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1335                 notify_id, (int)opcode);
1336 }
1337
1338 /*
1339  * Request sync osd notify
1340  */
1341 static int rbd_req_sync_notify(struct rbd_device *dev,
1342                           const char *obj)
1343 {
1344         struct ceph_osd_req_op *ops;
1345         struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1346         struct ceph_osd_event *event;
1347         struct rbd_notify_info info;
1348         int payload_len = sizeof(u32) + sizeof(u32);
1349         int ret;
1350
1351         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1352         if (ret < 0)
1353                 return ret;
1354
1355         info.dev = dev;
1356
1357         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1358                                      (void *)&info, &event);
1359         if (ret < 0)
1360                 goto fail;
1361
1362         ops[0].watch.ver = 1;
1363         ops[0].watch.flag = 1;
1364         ops[0].watch.cookie = event->cookie;
1365         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1366         ops[0].watch.timeout = 12;
1367
1368         ret = rbd_req_sync_op(dev, NULL,
1369                                CEPH_NOSNAP,
1370                                0,
1371                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1372                                ops,
1373                                1, obj, 0, 0, NULL, NULL, NULL);
1374         if (ret < 0)
1375                 goto fail_event;
1376
1377         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1378         dout("ceph_osdc_wait_event returned %d\n", ret);
1379         rbd_destroy_ops(ops);
1380         return 0;
1381
1382 fail_event:
1383         ceph_osdc_cancel_event(event);
1384 fail:
1385         rbd_destroy_ops(ops);
1386         return ret;
1387 }
1388
1389 /*
1390  * Request sync osd read
1391  */
1392 static int rbd_req_sync_exec(struct rbd_device *dev,
1393                              const char *obj,
1394                              const char *cls,
1395                              const char *method,
1396                              const char *data,
1397                              int len,
1398                              u64 *ver)
1399 {
1400         struct ceph_osd_req_op *ops;
1401         int cls_len = strlen(cls);
1402         int method_len = strlen(method);
1403         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1404                                     cls_len + method_len + len);
1405         if (ret < 0)
1406                 return ret;
1407
1408         ops[0].cls.class_name = cls;
1409         ops[0].cls.class_len = (__u8)cls_len;
1410         ops[0].cls.method_name = method;
1411         ops[0].cls.method_len = (__u8)method_len;
1412         ops[0].cls.argc = 0;
1413         ops[0].cls.indata = data;
1414         ops[0].cls.indata_len = len;
1415
1416         ret = rbd_req_sync_op(dev, NULL,
1417                                CEPH_NOSNAP,
1418                                0,
1419                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1420                                ops,
1421                                1, obj, 0, 0, NULL, NULL, ver);
1422
1423         rbd_destroy_ops(ops);
1424
1425         dout("cls_exec returned %d\n", ret);
1426         return ret;
1427 }
1428
1429 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1430 {
1431         struct rbd_req_coll *coll =
1432                         kzalloc(sizeof(struct rbd_req_coll) +
1433                                 sizeof(struct rbd_req_status) * num_reqs,
1434                                 GFP_ATOMIC);
1435
1436         if (!coll)
1437                 return NULL;
1438         coll->total = num_reqs;
1439         kref_init(&coll->kref);
1440         return coll;
1441 }
1442
1443 /*
1444  * block device queue callback
1445  */
1446 static void rbd_rq_fn(struct request_queue *q)
1447 {
1448         struct rbd_device *rbd_dev = q->queuedata;
1449         struct request *rq;
1450         struct bio_pair *bp = NULL;
1451
1452         while ((rq = blk_fetch_request(q))) {
1453                 struct bio *bio;
1454                 struct bio *rq_bio, *next_bio = NULL;
1455                 bool do_write;
1456                 int size, op_size = 0;
1457                 u64 ofs;
1458                 int num_segs, cur_seg = 0;
1459                 struct rbd_req_coll *coll;
1460
1461                 /* peek at request from block layer */
1462                 if (!rq)
1463                         break;
1464
1465                 dout("fetched request\n");
1466
1467                 /* filter out block requests we don't understand */
1468                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1469                         __blk_end_request_all(rq, 0);
1470                         continue;
1471                 }
1472
1473                 /* deduce our operation (read, write) */
1474                 do_write = (rq_data_dir(rq) == WRITE);
1475
1476                 size = blk_rq_bytes(rq);
1477                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1478                 rq_bio = rq->bio;
1479                 if (do_write && rbd_dev->read_only) {
1480                         __blk_end_request_all(rq, -EROFS);
1481                         continue;
1482                 }
1483
1484                 spin_unlock_irq(q->queue_lock);
1485
1486                 dout("%s 0x%x bytes at 0x%llx\n",
1487                      do_write ? "write" : "read",
1488                      size, blk_rq_pos(rq) * SECTOR_SIZE);
1489
1490                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1491                 coll = rbd_alloc_coll(num_segs);
1492                 if (!coll) {
1493                         spin_lock_irq(q->queue_lock);
1494                         __blk_end_request_all(rq, -ENOMEM);
1495                         continue;
1496                 }
1497
1498                 do {
1499                         /* a bio clone to be passed down to OSD req */
1500                         dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1501                         op_size = rbd_get_segment(&rbd_dev->header,
1502                                                   rbd_dev->header.block_name,
1503                                                   ofs, size,
1504                                                   NULL, NULL);
1505                         kref_get(&coll->kref);
1506                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1507                                               op_size, GFP_ATOMIC);
1508                         if (!bio) {
1509                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1510                                                        -ENOMEM, op_size);
1511                                 goto next_seg;
1512                         }
1513
1514
1515                         /* init OSD command: write or read */
1516                         if (do_write)
1517                                 rbd_req_write(rq, rbd_dev,
1518                                               rbd_dev->header.snapc,
1519                                               ofs,
1520                                               op_size, bio,
1521                                               coll, cur_seg);
1522                         else
1523                                 rbd_req_read(rq, rbd_dev,
1524                                              cur_snap_id(rbd_dev),
1525                                              ofs,
1526                                              op_size, bio,
1527                                              coll, cur_seg);
1528
1529 next_seg:
1530                         size -= op_size;
1531                         ofs += op_size;
1532
1533                         cur_seg++;
1534                         rq_bio = next_bio;
1535                 } while (size > 0);
1536                 kref_put(&coll->kref, rbd_coll_release);
1537
1538                 if (bp)
1539                         bio_pair_release(bp);
1540                 spin_lock_irq(q->queue_lock);
1541         }
1542 }
1543
1544 /*
1545  * a queue callback. Makes sure that we don't create a bio that spans across
1546  * multiple osd objects. One exception would be with a single page bios,
1547  * which we handle later at bio_chain_clone
1548  */
1549 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1550                           struct bio_vec *bvec)
1551 {
1552         struct rbd_device *rbd_dev = q->queuedata;
1553         unsigned int chunk_sectors;
1554         sector_t sector;
1555         unsigned int bio_sectors;
1556         int max;
1557
1558         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1559         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1560         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1561
1562         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1563                                  + bio_sectors)) << SECTOR_SHIFT;
1564         if (max < 0)
1565                 max = 0; /* bio_add cannot handle a negative return */
1566         if (max <= bvec->bv_len && bio_sectors == 0)
1567                 return bvec->bv_len;
1568         return max;
1569 }
1570
1571 static void rbd_free_disk(struct rbd_device *rbd_dev)
1572 {
1573         struct gendisk *disk = rbd_dev->disk;
1574
1575         if (!disk)
1576                 return;
1577
1578         rbd_header_free(&rbd_dev->header);
1579
1580         if (disk->flags & GENHD_FL_UP)
1581                 del_gendisk(disk);
1582         if (disk->queue)
1583                 blk_cleanup_queue(disk->queue);
1584         put_disk(disk);
1585 }
1586
1587 /*
1588  * reload the ondisk the header 
1589  */
1590 static int rbd_read_header(struct rbd_device *rbd_dev,
1591                            struct rbd_image_header *header)
1592 {
1593         ssize_t rc;
1594         struct rbd_image_header_ondisk *dh;
1595         int snap_count = 0;
1596         u64 ver;
1597         size_t len;
1598
1599         /*
1600          * First reads the fixed-size header to determine the number
1601          * of snapshots, then re-reads it, along with all snapshot
1602          * records as well as their stored names.
1603          */
1604         len = sizeof (*dh);
1605         while (1) {
1606                 dh = kmalloc(len, GFP_KERNEL);
1607                 if (!dh)
1608                         return -ENOMEM;
1609
1610                 rc = rbd_req_sync_read(rbd_dev,
1611                                        NULL, CEPH_NOSNAP,
1612                                        rbd_dev->obj_md_name,
1613                                        0, len,
1614                                        (char *)dh, &ver);
1615                 if (rc < 0)
1616                         goto out_dh;
1617
1618                 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1619                 if (rc < 0) {
1620                         if (rc == -ENXIO)
1621                                 pr_warning("unrecognized header format"
1622                                            " for image %s", rbd_dev->obj);
1623                         goto out_dh;
1624                 }
1625
1626                 if (snap_count == header->total_snaps)
1627                         break;
1628
1629                 snap_count = header->total_snaps;
1630                 len = sizeof (*dh) +
1631                         snap_count * sizeof(struct rbd_image_snap_ondisk) +
1632                         header->snap_names_len;
1633
1634                 rbd_header_free(header);
1635                 kfree(dh);
1636         }
1637         header->obj_version = ver;
1638
1639 out_dh:
1640         kfree(dh);
1641         return rc;
1642 }
1643
1644 /*
1645  * create a snapshot
1646  */
1647 static int rbd_header_add_snap(struct rbd_device *dev,
1648                                const char *snap_name,
1649                                gfp_t gfp_flags)
1650 {
1651         int name_len = strlen(snap_name);
1652         u64 new_snapid;
1653         int ret;
1654         void *data, *p, *e;
1655         u64 ver;
1656         struct ceph_mon_client *monc;
1657
1658         /* we should create a snapshot only if we're pointing at the head */
1659         if (dev->cur_snap)
1660                 return -EINVAL;
1661
1662         monc = &dev->rbd_client->client->monc;
1663         ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid);
1664         dout("created snapid=%lld\n", new_snapid);
1665         if (ret < 0)
1666                 return ret;
1667
1668         data = kmalloc(name_len + 16, gfp_flags);
1669         if (!data)
1670                 return -ENOMEM;
1671
1672         p = data;
1673         e = data + name_len + 16;
1674
1675         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1676         ceph_encode_64_safe(&p, e, new_snapid, bad);
1677
1678         ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1679                                 data, p - data, &ver);
1680
1681         kfree(data);
1682
1683         if (ret < 0)
1684                 return ret;
1685
1686         dev->header.snapc->seq =  new_snapid;
1687
1688         return 0;
1689 bad:
1690         return -ERANGE;
1691 }
1692
1693 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1694 {
1695         struct rbd_snap *snap;
1696
1697         while (!list_empty(&rbd_dev->snaps)) {
1698                 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1699                 __rbd_remove_snap_dev(rbd_dev, snap);
1700         }
1701 }
1702
1703 /*
1704  * only read the first part of the ondisk header, without the snaps info
1705  */
1706 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1707 {
1708         int ret;
1709         struct rbd_image_header h;
1710         u64 snap_seq;
1711         int follow_seq = 0;
1712
1713         ret = rbd_read_header(rbd_dev, &h);
1714         if (ret < 0)
1715                 return ret;
1716
1717         /* resized? */
1718         set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE);
1719
1720         down_write(&rbd_dev->header_rwsem);
1721
1722         snap_seq = rbd_dev->header.snapc->seq;
1723         if (rbd_dev->header.total_snaps &&
1724             rbd_dev->header.snapc->snaps[0] == snap_seq)
1725                 /* pointing at the head, will need to follow that
1726                    if head moves */
1727                 follow_seq = 1;
1728
1729         kfree(rbd_dev->header.snapc);
1730         kfree(rbd_dev->header.snap_names);
1731         kfree(rbd_dev->header.snap_sizes);
1732
1733         rbd_dev->header.total_snaps = h.total_snaps;
1734         rbd_dev->header.snapc = h.snapc;
1735         rbd_dev->header.snap_names = h.snap_names;
1736         rbd_dev->header.snap_names_len = h.snap_names_len;
1737         rbd_dev->header.snap_sizes = h.snap_sizes;
1738         if (follow_seq)
1739                 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1740         else
1741                 rbd_dev->header.snapc->seq = snap_seq;
1742
1743         ret = __rbd_init_snaps_header(rbd_dev);
1744
1745         up_write(&rbd_dev->header_rwsem);
1746
1747         return ret;
1748 }
1749
1750 static int rbd_init_disk(struct rbd_device *rbd_dev)
1751 {
1752         struct gendisk *disk;
1753         struct request_queue *q;
1754         int rc;
1755         u64 segment_size;
1756         u64 total_size = 0;
1757
1758         /* contact OSD, request size info about the object being mapped */
1759         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1760         if (rc)
1761                 return rc;
1762
1763         /* no need to lock here, as rbd_dev is not registered yet */
1764         rc = __rbd_init_snaps_header(rbd_dev);
1765         if (rc)
1766                 return rc;
1767
1768         rc = rbd_header_set_snap(rbd_dev, &total_size);
1769         if (rc)
1770                 return rc;
1771
1772         /* create gendisk info */
1773         rc = -ENOMEM;
1774         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1775         if (!disk)
1776                 goto out;
1777
1778         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1779                  rbd_dev->id);
1780         disk->major = rbd_dev->major;
1781         disk->first_minor = 0;
1782         disk->fops = &rbd_bd_ops;
1783         disk->private_data = rbd_dev;
1784
1785         /* init rq */
1786         rc = -ENOMEM;
1787         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1788         if (!q)
1789                 goto out_disk;
1790
1791         /* We use the default size, but let's be explicit about it. */
1792         blk_queue_physical_block_size(q, SECTOR_SIZE);
1793
1794         /* set io sizes to object size */
1795         segment_size = rbd_obj_bytes(&rbd_dev->header);
1796         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1797         blk_queue_max_segment_size(q, segment_size);
1798         blk_queue_io_min(q, segment_size);
1799         blk_queue_io_opt(q, segment_size);
1800
1801         blk_queue_merge_bvec(q, rbd_merge_bvec);
1802         disk->queue = q;
1803
1804         q->queuedata = rbd_dev;
1805
1806         rbd_dev->disk = disk;
1807         rbd_dev->q = q;
1808
1809         /* finally, announce the disk to the world */
1810         set_capacity(disk, total_size / SECTOR_SIZE);
1811         add_disk(disk);
1812
1813         pr_info("%s: added with size 0x%llx\n",
1814                 disk->disk_name, (unsigned long long)total_size);
1815         return 0;
1816
1817 out_disk:
1818         put_disk(disk);
1819 out:
1820         return rc;
1821 }
1822
1823 /*
1824   sysfs
1825 */
1826
1827 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1828 {
1829         return container_of(dev, struct rbd_device, dev);
1830 }
1831
1832 static ssize_t rbd_size_show(struct device *dev,
1833                              struct device_attribute *attr, char *buf)
1834 {
1835         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1836
1837         return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1838 }
1839
1840 static ssize_t rbd_major_show(struct device *dev,
1841                               struct device_attribute *attr, char *buf)
1842 {
1843         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1844
1845         return sprintf(buf, "%d\n", rbd_dev->major);
1846 }
1847
1848 static ssize_t rbd_client_id_show(struct device *dev,
1849                                   struct device_attribute *attr, char *buf)
1850 {
1851         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1852
1853         return sprintf(buf, "client%lld\n",
1854                         ceph_client_id(rbd_dev->rbd_client->client));
1855 }
1856
1857 static ssize_t rbd_pool_show(struct device *dev,
1858                              struct device_attribute *attr, char *buf)
1859 {
1860         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1861
1862         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1863 }
1864
1865 static ssize_t rbd_name_show(struct device *dev,
1866                              struct device_attribute *attr, char *buf)
1867 {
1868         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1869
1870         return sprintf(buf, "%s\n", rbd_dev->obj);
1871 }
1872
1873 static ssize_t rbd_snap_show(struct device *dev,
1874                              struct device_attribute *attr,
1875                              char *buf)
1876 {
1877         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1878
1879         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1880 }
1881
1882 static ssize_t rbd_image_refresh(struct device *dev,
1883                                  struct device_attribute *attr,
1884                                  const char *buf,
1885                                  size_t size)
1886 {
1887         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1888         int rc;
1889         int ret = size;
1890
1891         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1892
1893         rc = __rbd_update_snaps(rbd_dev);
1894         if (rc < 0)
1895                 ret = rc;
1896
1897         mutex_unlock(&ctl_mutex);
1898         return ret;
1899 }
1900
1901 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1902 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1903 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1904 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1905 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1906 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1907 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1908 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1909
1910 static struct attribute *rbd_attrs[] = {
1911         &dev_attr_size.attr,
1912         &dev_attr_major.attr,
1913         &dev_attr_client_id.attr,
1914         &dev_attr_pool.attr,
1915         &dev_attr_name.attr,
1916         &dev_attr_current_snap.attr,
1917         &dev_attr_refresh.attr,
1918         &dev_attr_create_snap.attr,
1919         NULL
1920 };
1921
1922 static struct attribute_group rbd_attr_group = {
1923         .attrs = rbd_attrs,
1924 };
1925
1926 static const struct attribute_group *rbd_attr_groups[] = {
1927         &rbd_attr_group,
1928         NULL
1929 };
1930
1931 static void rbd_sysfs_dev_release(struct device *dev)
1932 {
1933 }
1934
1935 static struct device_type rbd_device_type = {
1936         .name           = "rbd",
1937         .groups         = rbd_attr_groups,
1938         .release        = rbd_sysfs_dev_release,
1939 };
1940
1941
1942 /*
1943   sysfs - snapshots
1944 */
1945
1946 static ssize_t rbd_snap_size_show(struct device *dev,
1947                                   struct device_attribute *attr,
1948                                   char *buf)
1949 {
1950         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1951
1952         return sprintf(buf, "%zd\n", snap->size);
1953 }
1954
1955 static ssize_t rbd_snap_id_show(struct device *dev,
1956                                 struct device_attribute *attr,
1957                                 char *buf)
1958 {
1959         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1960
1961         return sprintf(buf, "%llu\n", (unsigned long long) snap->id);
1962 }
1963
1964 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1965 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1966
1967 static struct attribute *rbd_snap_attrs[] = {
1968         &dev_attr_snap_size.attr,
1969         &dev_attr_snap_id.attr,
1970         NULL,
1971 };
1972
1973 static struct attribute_group rbd_snap_attr_group = {
1974         .attrs = rbd_snap_attrs,
1975 };
1976
1977 static void rbd_snap_dev_release(struct device *dev)
1978 {
1979         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1980         kfree(snap->name);
1981         kfree(snap);
1982 }
1983
1984 static const struct attribute_group *rbd_snap_attr_groups[] = {
1985         &rbd_snap_attr_group,
1986         NULL
1987 };
1988
1989 static struct device_type rbd_snap_device_type = {
1990         .groups         = rbd_snap_attr_groups,
1991         .release        = rbd_snap_dev_release,
1992 };
1993
1994 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1995                                   struct rbd_snap *snap)
1996 {
1997         list_del(&snap->node);
1998         device_unregister(&snap->dev);
1999 }
2000
2001 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
2002                                   struct rbd_snap *snap,
2003                                   struct device *parent)
2004 {
2005         struct device *dev = &snap->dev;
2006         int ret;
2007
2008         dev->type = &rbd_snap_device_type;
2009         dev->parent = parent;
2010         dev->release = rbd_snap_dev_release;
2011         dev_set_name(dev, "snap_%s", snap->name);
2012         ret = device_register(dev);
2013
2014         return ret;
2015 }
2016
2017 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2018                               int i, const char *name,
2019                               struct rbd_snap **snapp)
2020 {
2021         int ret;
2022         struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2023         if (!snap)
2024                 return -ENOMEM;
2025         snap->name = kstrdup(name, GFP_KERNEL);
2026         snap->size = rbd_dev->header.snap_sizes[i];
2027         snap->id = rbd_dev->header.snapc->snaps[i];
2028         if (device_is_registered(&rbd_dev->dev)) {
2029                 ret = rbd_register_snap_dev(rbd_dev, snap,
2030                                              &rbd_dev->dev);
2031                 if (ret < 0)
2032                         goto err;
2033         }
2034         *snapp = snap;
2035         return 0;
2036 err:
2037         kfree(snap->name);
2038         kfree(snap);
2039         return ret;
2040 }
2041
2042 /*
2043  * search for the previous snap in a null delimited string list
2044  */
2045 const char *rbd_prev_snap_name(const char *name, const char *start)
2046 {
2047         if (name < start + 2)
2048                 return NULL;
2049
2050         name -= 2;
2051         while (*name) {
2052                 if (name == start)
2053                         return start;
2054                 name--;
2055         }
2056         return name + 1;
2057 }
2058
2059 /*
2060  * compare the old list of snapshots that we have to what's in the header
2061  * and update it accordingly. Note that the header holds the snapshots
2062  * in a reverse order (from newest to oldest) and we need to go from
2063  * older to new so that we don't get a duplicate snap name when
2064  * doing the process (e.g., removed snapshot and recreated a new
2065  * one with the same name.
2066  */
2067 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2068 {
2069         const char *name, *first_name;
2070         int i = rbd_dev->header.total_snaps;
2071         struct rbd_snap *snap, *old_snap = NULL;
2072         int ret;
2073         struct list_head *p, *n;
2074
2075         first_name = rbd_dev->header.snap_names;
2076         name = first_name + rbd_dev->header.snap_names_len;
2077
2078         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2079                 u64 cur_id;
2080
2081                 old_snap = list_entry(p, struct rbd_snap, node);
2082
2083                 if (i)
2084                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
2085
2086                 if (!i || old_snap->id < cur_id) {
2087                         /* old_snap->id was skipped, thus was removed */
2088                         __rbd_remove_snap_dev(rbd_dev, old_snap);
2089                         continue;
2090                 }
2091                 if (old_snap->id == cur_id) {
2092                         /* we have this snapshot already */
2093                         i--;
2094                         name = rbd_prev_snap_name(name, first_name);
2095                         continue;
2096                 }
2097                 for (; i > 0;
2098                      i--, name = rbd_prev_snap_name(name, first_name)) {
2099                         if (!name) {
2100                                 WARN_ON(1);
2101                                 return -EINVAL;
2102                         }
2103                         cur_id = rbd_dev->header.snapc->snaps[i];
2104                         /* snapshot removal? handle it above */
2105                         if (cur_id >= old_snap->id)
2106                                 break;
2107                         /* a new snapshot */
2108                         ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2109                         if (ret < 0)
2110                                 return ret;
2111
2112                         /* note that we add it backward so using n and not p */
2113                         list_add(&snap->node, n);
2114                         p = &snap->node;
2115                 }
2116         }
2117         /* we're done going over the old snap list, just add what's left */
2118         for (; i > 0; i--) {
2119                 name = rbd_prev_snap_name(name, first_name);
2120                 if (!name) {
2121                         WARN_ON(1);
2122                         return -EINVAL;
2123                 }
2124                 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2125                 if (ret < 0)
2126                         return ret;
2127                 list_add(&snap->node, &rbd_dev->snaps);
2128         }
2129
2130         return 0;
2131 }
2132
2133 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2134 {
2135         int ret;
2136         struct device *dev;
2137         struct rbd_snap *snap;
2138
2139         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2140         dev = &rbd_dev->dev;
2141
2142         dev->bus = &rbd_bus_type;
2143         dev->type = &rbd_device_type;
2144         dev->parent = &rbd_root_dev;
2145         dev->release = rbd_dev_release;
2146         dev_set_name(dev, "%d", rbd_dev->id);
2147         ret = device_register(dev);
2148         if (ret < 0)
2149                 goto out;
2150
2151         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2152                 ret = rbd_register_snap_dev(rbd_dev, snap,
2153                                              &rbd_dev->dev);
2154                 if (ret < 0)
2155                         break;
2156         }
2157 out:
2158         mutex_unlock(&ctl_mutex);
2159         return ret;
2160 }
2161
2162 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2163 {
2164         device_unregister(&rbd_dev->dev);
2165 }
2166
2167 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2168 {
2169         int ret, rc;
2170
2171         do {
2172                 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2173                                          rbd_dev->header.obj_version);
2174                 if (ret == -ERANGE) {
2175                         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2176                         rc = __rbd_update_snaps(rbd_dev);
2177                         mutex_unlock(&ctl_mutex);
2178                         if (rc < 0)
2179                                 return rc;
2180                 }
2181         } while (ret == -ERANGE);
2182
2183         return ret;
2184 }
2185
2186 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2187
2188 /*
2189  * Get a unique rbd identifier for the given new rbd_dev, and add
2190  * the rbd_dev to the global list.  The minimum rbd id is 1.
2191  */
2192 static void rbd_id_get(struct rbd_device *rbd_dev)
2193 {
2194         rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2195
2196         spin_lock(&rbd_dev_list_lock);
2197         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2198         spin_unlock(&rbd_dev_list_lock);
2199 }
2200
2201 /*
2202  * Remove an rbd_dev from the global list, and record that its
2203  * identifier is no longer in use.
2204  */
2205 static void rbd_id_put(struct rbd_device *rbd_dev)
2206 {
2207         struct list_head *tmp;
2208         int rbd_id = rbd_dev->id;
2209         int max_id;
2210
2211         BUG_ON(rbd_id < 1);
2212
2213         spin_lock(&rbd_dev_list_lock);
2214         list_del_init(&rbd_dev->node);
2215
2216         /*
2217          * If the id being "put" is not the current maximum, there
2218          * is nothing special we need to do.
2219          */
2220         if (rbd_id != atomic64_read(&rbd_id_max)) {
2221                 spin_unlock(&rbd_dev_list_lock);
2222                 return;
2223         }
2224
2225         /*
2226          * We need to update the current maximum id.  Search the
2227          * list to find out what it is.  We're more likely to find
2228          * the maximum at the end, so search the list backward.
2229          */
2230         max_id = 0;
2231         list_for_each_prev(tmp, &rbd_dev_list) {
2232                 struct rbd_device *rbd_dev;
2233
2234                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2235                 if (rbd_id > max_id)
2236                         max_id = rbd_id;
2237         }
2238         spin_unlock(&rbd_dev_list_lock);
2239
2240         /*
2241          * The max id could have been updated by rbd_id_get(), in
2242          * which case it now accurately reflects the new maximum.
2243          * Be careful not to overwrite the maximum value in that
2244          * case.
2245          */
2246         atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2247 }
2248
2249 /*
2250  * Skips over white space at *buf, and updates *buf to point to the
2251  * first found non-space character (if any). Returns the length of
2252  * the token (string of non-white space characters) found.  Note
2253  * that *buf must be terminated with '\0'.
2254  */
2255 static inline size_t next_token(const char **buf)
2256 {
2257         /*
2258         * These are the characters that produce nonzero for
2259         * isspace() in the "C" and "POSIX" locales.
2260         */
2261         const char *spaces = " \f\n\r\t\v";
2262
2263         *buf += strspn(*buf, spaces);   /* Find start of token */
2264
2265         return strcspn(*buf, spaces);   /* Return token length */
2266 }
2267
2268 /*
2269  * Finds the next token in *buf, and if the provided token buffer is
2270  * big enough, copies the found token into it.  The result, if
2271  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2272  * must be terminated with '\0' on entry.
2273  *
2274  * Returns the length of the token found (not including the '\0').
2275  * Return value will be 0 if no token is found, and it will be >=
2276  * token_size if the token would not fit.
2277  *
2278  * The *buf pointer will be updated to point beyond the end of the
2279  * found token.  Note that this occurs even if the token buffer is
2280  * too small to hold it.
2281  */
2282 static inline size_t copy_token(const char **buf,
2283                                 char *token,
2284                                 size_t token_size)
2285 {
2286         size_t len;
2287
2288         len = next_token(buf);
2289         if (len < token_size) {
2290                 memcpy(token, *buf, len);
2291                 *(token + len) = '\0';
2292         }
2293         *buf += len;
2294
2295         return len;
2296 }
2297
2298 /*
2299  * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
2300  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2301  * on the list of monitor addresses and other options provided via
2302  * /sys/bus/rbd/add.
2303  */
2304 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2305                               const char *buf,
2306                               const char **mon_addrs,
2307                               size_t *mon_addrs_size,
2308                               char *options,
2309                               size_t options_size)
2310 {
2311         size_t  len;
2312
2313         /* The first four tokens are required */
2314
2315         len = next_token(&buf);
2316         if (!len)
2317                 return -EINVAL;
2318         *mon_addrs_size = len + 1;
2319         *mon_addrs = buf;
2320
2321         buf += len;
2322
2323         len = copy_token(&buf, options, options_size);
2324         if (!len || len >= options_size)
2325                 return -EINVAL;
2326
2327         len = copy_token(&buf, rbd_dev->pool_name, sizeof (rbd_dev->pool_name));
2328         if (!len || len >= sizeof (rbd_dev->pool_name))
2329                 return -EINVAL;
2330
2331         len = copy_token(&buf, rbd_dev->obj, sizeof (rbd_dev->obj));
2332         if (!len || len >= sizeof (rbd_dev->obj))
2333                 return -EINVAL;
2334
2335         /* We have the object length in hand, save it. */
2336
2337         rbd_dev->obj_len = len;
2338
2339         BUILD_BUG_ON(RBD_MAX_MD_NAME_LEN
2340                                 < RBD_MAX_OBJ_NAME_LEN + sizeof (RBD_SUFFIX));
2341         sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
2342
2343         /*
2344          * The snapshot name is optional, but it's an error if it's
2345          * too long.  If no snapshot is supplied, fill in the default.
2346          */
2347         len = copy_token(&buf, rbd_dev->snap_name, sizeof (rbd_dev->snap_name));
2348         if (!len)
2349                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2350                         sizeof (RBD_SNAP_HEAD_NAME));
2351         else if (len >= sizeof (rbd_dev->snap_name))
2352                 return -EINVAL;
2353
2354         return 0;
2355 }
2356
2357 static ssize_t rbd_add(struct bus_type *bus,
2358                        const char *buf,
2359                        size_t count)
2360 {
2361         struct rbd_device *rbd_dev;
2362         const char *mon_addrs = NULL;
2363         size_t mon_addrs_size = 0;
2364         char *options = NULL;
2365         struct ceph_osd_client *osdc;
2366         int rc = -ENOMEM;
2367
2368         if (!try_module_get(THIS_MODULE))
2369                 return -ENODEV;
2370
2371         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2372         if (!rbd_dev)
2373                 goto err_nomem;
2374         options = kmalloc(count, GFP_KERNEL);
2375         if (!options)
2376                 goto err_nomem;
2377
2378         /* static rbd_device initialization */
2379         spin_lock_init(&rbd_dev->lock);
2380         INIT_LIST_HEAD(&rbd_dev->node);
2381         INIT_LIST_HEAD(&rbd_dev->snaps);
2382         init_rwsem(&rbd_dev->header_rwsem);
2383
2384         init_rwsem(&rbd_dev->header_rwsem);
2385
2386         /* generate unique id: find highest unique id, add one */
2387         rbd_id_get(rbd_dev);
2388
2389         /* Fill in the device name, now that we have its id. */
2390         BUILD_BUG_ON(DEV_NAME_LEN
2391                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2392         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2393
2394         /* parse add command */
2395         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2396                                 options, count);
2397         if (rc)
2398                 goto err_put_id;
2399
2400         rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2401                                                 options);
2402         if (IS_ERR(rbd_dev->rbd_client)) {
2403                 rc = PTR_ERR(rbd_dev->rbd_client);
2404                 goto err_put_id;
2405         }
2406
2407         /* pick the pool */
2408         osdc = &rbd_dev->rbd_client->client->osdc;
2409         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2410         if (rc < 0)
2411                 goto err_out_client;
2412         rbd_dev->poolid = rc;
2413
2414         /* register our block device */
2415         rc = register_blkdev(0, rbd_dev->name);
2416         if (rc < 0)
2417                 goto err_out_client;
2418         rbd_dev->major = rc;
2419
2420         rc = rbd_bus_add_dev(rbd_dev);
2421         if (rc)
2422                 goto err_out_blkdev;
2423
2424         /*
2425          * At this point cleanup in the event of an error is the job
2426          * of the sysfs code (initiated by rbd_bus_del_dev()).
2427          *
2428          * Set up and announce blkdev mapping.
2429          */
2430         rc = rbd_init_disk(rbd_dev);
2431         if (rc)
2432                 goto err_out_bus;
2433
2434         rc = rbd_init_watch_dev(rbd_dev);
2435         if (rc)
2436                 goto err_out_bus;
2437
2438         return count;
2439
2440 err_out_bus:
2441         /* this will also clean up rest of rbd_dev stuff */
2442
2443         rbd_bus_del_dev(rbd_dev);
2444         kfree(options);
2445         return rc;
2446
2447 err_out_blkdev:
2448         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2449 err_out_client:
2450         rbd_put_client(rbd_dev);
2451 err_put_id:
2452         rbd_id_put(rbd_dev);
2453 err_nomem:
2454         kfree(options);
2455         kfree(rbd_dev);
2456
2457         dout("Error adding device %s\n", buf);
2458         module_put(THIS_MODULE);
2459
2460         return (ssize_t) rc;
2461 }
2462
2463 static struct rbd_device *__rbd_get_dev(unsigned long id)
2464 {
2465         struct list_head *tmp;
2466         struct rbd_device *rbd_dev;
2467
2468         spin_lock(&rbd_dev_list_lock);
2469         list_for_each(tmp, &rbd_dev_list) {
2470                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2471                 if (rbd_dev->id == id) {
2472                         spin_unlock(&rbd_dev_list_lock);
2473                         return rbd_dev;
2474                 }
2475         }
2476         spin_unlock(&rbd_dev_list_lock);
2477         return NULL;
2478 }
2479
2480 static void rbd_dev_release(struct device *dev)
2481 {
2482         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2483
2484         if (rbd_dev->watch_request) {
2485                 struct ceph_client *client = rbd_dev->rbd_client->client;
2486
2487                 ceph_osdc_unregister_linger_request(&client->osdc,
2488                                                     rbd_dev->watch_request);
2489         }
2490         if (rbd_dev->watch_event)
2491                 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2492
2493         rbd_put_client(rbd_dev);
2494
2495         /* clean up and free blkdev */
2496         rbd_free_disk(rbd_dev);
2497         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2498
2499         /* done with the id, and with the rbd_dev */
2500         rbd_id_put(rbd_dev);
2501         kfree(rbd_dev);
2502
2503         /* release module ref */
2504         module_put(THIS_MODULE);
2505 }
2506
2507 static ssize_t rbd_remove(struct bus_type *bus,
2508                           const char *buf,
2509                           size_t count)
2510 {
2511         struct rbd_device *rbd_dev = NULL;
2512         int target_id, rc;
2513         unsigned long ul;
2514         int ret = count;
2515
2516         rc = strict_strtoul(buf, 10, &ul);
2517         if (rc)
2518                 return rc;
2519
2520         /* convert to int; abort if we lost anything in the conversion */
2521         target_id = (int) ul;
2522         if (target_id != ul)
2523                 return -EINVAL;
2524
2525         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2526
2527         rbd_dev = __rbd_get_dev(target_id);
2528         if (!rbd_dev) {
2529                 ret = -ENOENT;
2530                 goto done;
2531         }
2532
2533         __rbd_remove_all_snaps(rbd_dev);
2534         rbd_bus_del_dev(rbd_dev);
2535
2536 done:
2537         mutex_unlock(&ctl_mutex);
2538         return ret;
2539 }
2540
2541 static ssize_t rbd_snap_add(struct device *dev,
2542                             struct device_attribute *attr,
2543                             const char *buf,
2544                             size_t count)
2545 {
2546         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2547         int ret;
2548         char *name = kmalloc(count + 1, GFP_KERNEL);
2549         if (!name)
2550                 return -ENOMEM;
2551
2552         snprintf(name, count, "%s", buf);
2553
2554         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2555
2556         ret = rbd_header_add_snap(rbd_dev,
2557                                   name, GFP_KERNEL);
2558         if (ret < 0)
2559                 goto err_unlock;
2560
2561         ret = __rbd_update_snaps(rbd_dev);
2562         if (ret < 0)
2563                 goto err_unlock;
2564
2565         /* shouldn't hold ctl_mutex when notifying.. notify might
2566            trigger a watch callback that would need to get that mutex */
2567         mutex_unlock(&ctl_mutex);
2568
2569         /* make a best effort, don't error if failed */
2570         rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2571
2572         ret = count;
2573         kfree(name);
2574         return ret;
2575
2576 err_unlock:
2577         mutex_unlock(&ctl_mutex);
2578         kfree(name);
2579         return ret;
2580 }
2581
2582 /*
2583  * create control files in sysfs
2584  * /sys/bus/rbd/...
2585  */
2586 static int rbd_sysfs_init(void)
2587 {
2588         int ret;
2589
2590         ret = device_register(&rbd_root_dev);
2591         if (ret < 0)
2592                 return ret;
2593
2594         ret = bus_register(&rbd_bus_type);
2595         if (ret < 0)
2596                 device_unregister(&rbd_root_dev);
2597
2598         return ret;
2599 }
2600
2601 static void rbd_sysfs_cleanup(void)
2602 {
2603         bus_unregister(&rbd_bus_type);
2604         device_unregister(&rbd_root_dev);
2605 }
2606
2607 int __init rbd_init(void)
2608 {
2609         int rc;
2610
2611         rc = rbd_sysfs_init();
2612         if (rc)
2613                 return rc;
2614         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2615         return 0;
2616 }
2617
2618 void __exit rbd_exit(void)
2619 {
2620         rbd_sysfs_cleanup();
2621 }
2622
2623 module_init(rbd_init);
2624 module_exit(rbd_exit);
2625
2626 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2627 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2628 MODULE_DESCRIPTION("rados block device");
2629
2630 /* following authorship retained from original osdblk.c */
2631 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2632
2633 MODULE_LICENSE("GPL");