Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
file.c
Go to the documentation of this file.
2 
3 #include <linux/module.h>
4 #include <linux/sched.h>
5 #include <linux/slab.h>
6 #include <linux/file.h>
7 #include <linux/mount.h>
8 #include <linux/namei.h>
9 #include <linux/writeback.h>
10 
11 #include "super.h"
12 #include "mds_client.h"
13 
14 /*
15  * Ceph file operations
16  *
17  * Implement basic open/close functionality, and implement
18  * read/write.
19  *
20  * We implement three modes of file I/O:
21  * - buffered uses the generic_file_aio_{read,write} helpers
22  *
23  * - synchronous is used when there is multi-client read/write
24  * sharing, avoids the page cache, and synchronously waits for an
25  * ack from the OSD.
26  *
27  * - direct io takes the variant of the sync path that references
28  * user pages directly.
29  *
30  * fsync() flushes and waits on dirty pages, but just queues metadata
31  * for writeback: since the MDS can recover size and mtime there is no
32  * need to wait for MDS acknowledgement.
33  */
34 
35 
36 /*
37  * Prepare an open request. Preallocate ceph_cap to avoid an
38  * inopportune ENOMEM later.
39  */
40 static struct ceph_mds_request *
41 prepare_open_request(struct super_block *sb, int flags, int create_mode)
42 {
43  struct ceph_fs_client *fsc = ceph_sb_to_client(sb);
44  struct ceph_mds_client *mdsc = fsc->mdsc;
45  struct ceph_mds_request *req;
46  int want_auth = USE_ANY_MDS;
47  int op = (flags & O_CREAT) ? CEPH_MDS_OP_CREATE : CEPH_MDS_OP_OPEN;
48 
49  if (flags & (O_WRONLY|O_RDWR|O_CREAT|O_TRUNC))
50  want_auth = USE_AUTH_MDS;
51 
52  req = ceph_mdsc_create_request(mdsc, op, want_auth);
53  if (IS_ERR(req))
54  goto out;
55  req->r_fmode = ceph_flags_to_mode(flags);
56  req->r_args.open.flags = cpu_to_le32(flags);
57  req->r_args.open.mode = cpu_to_le32(create_mode);
58 out:
59  return req;
60 }
61 
62 /*
63  * initialize private struct file data.
64  * if we fail, clean up by dropping fmode reference on the ceph_inode
65  */
66 static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
67 {
68  struct ceph_file_info *cf;
69  int ret = 0;
70 
71  switch (inode->i_mode & S_IFMT) {
72  case S_IFREG:
73  case S_IFDIR:
74  dout("init_file %p %p 0%o (regular)\n", inode, file,
75  inode->i_mode);
77  if (cf == NULL) {
78  ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
79  return -ENOMEM;
80  }
81  cf->fmode = fmode;
82  cf->next_offset = 2;
83  file->private_data = cf;
84  BUG_ON(inode->i_fop->release != ceph_release);
85  break;
86 
87  case S_IFLNK:
88  dout("init_file %p %p 0%o (symlink)\n", inode, file,
89  inode->i_mode);
90  ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
91  break;
92 
93  default:
94  dout("init_file %p %p 0%o (special)\n", inode, file,
95  inode->i_mode);
96  /*
97  * we need to drop the open ref now, since we don't
98  * have .release set to ceph_release.
99  */
100  ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */
101  BUG_ON(inode->i_fop->release == ceph_release);
102 
103  /* call the proper open fop */
104  ret = inode->i_fop->open(inode, file);
105  }
106  return ret;
107 }
108 
109 /*
110  * If we already have the requisite capabilities, we can satisfy
111  * the open request locally (no need to request new caps from the
112  * MDS). We do, however, need to inform the MDS (asynchronously)
113  * if our wanted caps set expands.
114  */
115 int ceph_open(struct inode *inode, struct file *file)
116 {
117  struct ceph_inode_info *ci = ceph_inode(inode);
118  struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
119  struct ceph_mds_client *mdsc = fsc->mdsc;
120  struct ceph_mds_request *req;
121  struct ceph_file_info *cf = file->private_data;
122  struct inode *parent_inode = NULL;
123  int err;
124  int flags, fmode, wanted;
125 
126  if (cf) {
127  dout("open file %p is already opened\n", file);
128  return 0;
129  }
130 
131  /* filter out O_CREAT|O_EXCL; vfs did that already. yuck. */
132  flags = file->f_flags & ~(O_CREAT|O_EXCL);
133  if (S_ISDIR(inode->i_mode))
134  flags = O_DIRECTORY; /* mds likes to know */
135 
136  dout("open inode %p ino %llx.%llx file %p flags %d (%d)\n", inode,
137  ceph_vinop(inode), file, flags, file->f_flags);
138  fmode = ceph_flags_to_mode(flags);
139  wanted = ceph_caps_for_mode(fmode);
140 
141  /* snapped files are read-only */
142  if (ceph_snap(inode) != CEPH_NOSNAP && (file->f_mode & FMODE_WRITE))
143  return -EROFS;
144 
145  /* trivially open snapdir */
146  if (ceph_snap(inode) == CEPH_SNAPDIR) {
147  spin_lock(&ci->i_ceph_lock);
148  __ceph_get_fmode(ci, fmode);
149  spin_unlock(&ci->i_ceph_lock);
150  return ceph_init_file(inode, file, fmode);
151  }
152 
153  /*
154  * No need to block if we have caps on the auth MDS (for
155  * write) or any MDS (for read). Update wanted set
156  * asynchronously.
157  */
158  spin_lock(&ci->i_ceph_lock);
159  if (__ceph_is_any_real_caps(ci) &&
160  (((fmode & CEPH_FILE_MODE_WR) == 0) || ci->i_auth_cap)) {
161  int mds_wanted = __ceph_caps_mds_wanted(ci);
162  int issued = __ceph_caps_issued(ci, NULL);
163 
164  dout("open %p fmode %d want %s issued %s using existing\n",
165  inode, fmode, ceph_cap_string(wanted),
166  ceph_cap_string(issued));
167  __ceph_get_fmode(ci, fmode);
168  spin_unlock(&ci->i_ceph_lock);
169 
170  /* adjust wanted? */
171  if ((issued & wanted) != wanted &&
172  (mds_wanted & wanted) != wanted &&
173  ceph_snap(inode) != CEPH_SNAPDIR)
174  ceph_check_caps(ci, 0, NULL);
175 
176  return ceph_init_file(inode, file, fmode);
177  } else if (ceph_snap(inode) != CEPH_NOSNAP &&
178  (ci->i_snap_caps & wanted) == wanted) {
179  __ceph_get_fmode(ci, fmode);
180  spin_unlock(&ci->i_ceph_lock);
181  return ceph_init_file(inode, file, fmode);
182  }
183  spin_unlock(&ci->i_ceph_lock);
184 
185  dout("open fmode %d wants %s\n", fmode, ceph_cap_string(wanted));
186  req = prepare_open_request(inode->i_sb, flags, 0);
187  if (IS_ERR(req)) {
188  err = PTR_ERR(req);
189  goto out;
190  }
191  req->r_inode = inode;
192  ihold(inode);
193  req->r_num_caps = 1;
194  if (flags & (O_CREAT|O_TRUNC))
195  parent_inode = ceph_get_dentry_parent_inode(file->f_dentry);
196  err = ceph_mdsc_do_request(mdsc, parent_inode, req);
197  iput(parent_inode);
198  if (!err)
199  err = ceph_init_file(inode, file, req->r_fmode);
200  ceph_mdsc_put_request(req);
201  dout("open result=%d on %llx.%llx\n", err, ceph_vinop(inode));
202 out:
203  return err;
204 }
205 
206 
207 /*
208  * Do a lookup + open with a single request. If we get a non-existent
209  * file or symlink, return 1 so the VFS can retry.
210  */
211 int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
212  struct file *file, unsigned flags, umode_t mode,
213  int *opened)
214 {
215  struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
216  struct ceph_mds_client *mdsc = fsc->mdsc;
217  struct ceph_mds_request *req;
218  struct dentry *dn;
219  int err;
220 
221  dout("atomic_open %p dentry %p '%.*s' %s flags %d mode 0%o\n",
222  dir, dentry, dentry->d_name.len, dentry->d_name.name,
223  d_unhashed(dentry) ? "unhashed" : "hashed", flags, mode);
224 
225  if (dentry->d_name.len > NAME_MAX)
226  return -ENAMETOOLONG;
227 
228  err = ceph_init_dentry(dentry);
229  if (err < 0)
230  return err;
231 
232  /* do the open */
233  req = prepare_open_request(dir->i_sb, flags, mode);
234  if (IS_ERR(req))
235  return PTR_ERR(req);
236  req->r_dentry = dget(dentry);
237  req->r_num_caps = 2;
238  if (flags & O_CREAT) {
241  }
242  req->r_locked_dir = dir; /* caller holds dir->i_mutex */
243  err = ceph_mdsc_do_request(mdsc,
244  (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
245  req);
246  err = ceph_handle_snapdir(req, dentry, err);
247  if (err == 0 && (flags & O_CREAT) && !req->r_reply_info.head->is_dentry)
248  err = ceph_handle_notrace_create(dir, dentry);
249 
250  if (d_unhashed(dentry)) {
251  dn = ceph_finish_lookup(req, dentry, err);
252  if (IS_ERR(dn))
253  err = PTR_ERR(dn);
254  } else {
255  /* we were given a hashed negative dentry */
256  dn = NULL;
257  }
258  if (err)
259  goto out_err;
260  if (dn || dentry->d_inode == NULL || S_ISLNK(dentry->d_inode->i_mode)) {
261  /* make vfs retry on splice, ENOENT, or symlink */
262  dout("atomic_open finish_no_open on dn %p\n", dn);
263  err = finish_no_open(file, dn);
264  } else {
265  dout("atomic_open finish_open on dn %p\n", dn);
266  err = finish_open(file, dentry, ceph_open, opened);
267  }
268 
269 out_err:
270  ceph_mdsc_put_request(req);
271  dout("atomic_open result=%d\n", err);
272  return err;
273 }
274 
275 int ceph_release(struct inode *inode, struct file *file)
276 {
277  struct ceph_inode_info *ci = ceph_inode(inode);
278  struct ceph_file_info *cf = file->private_data;
279 
280  dout("release inode %p file %p\n", inode, file);
281  ceph_put_fmode(ci, cf->fmode);
282  if (cf->last_readdir)
283  ceph_mdsc_put_request(cf->last_readdir);
284  kfree(cf->last_name);
285  kfree(cf->dir_info);
286  dput(cf->dentry);
288 
289  /* wake up anyone waiting for caps on this inode */
290  wake_up_all(&ci->i_cap_wq);
291  return 0;
292 }
293 
294 /*
295  * Read a range of bytes striped over one or more objects. Iterate over
296  * objects we stripe over. (That's not atomic, but good enough for now.)
297  *
298  * If we get a short result from the OSD, check against i_size; we need to
299  * only return a short read to the caller if we hit EOF.
300  */
301 static int striped_read(struct inode *inode,
302  u64 off, u64 len,
303  struct page **pages, int num_pages,
304  int *checkeof, bool o_direct,
305  unsigned long buf_align)
306 {
307  struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
308  struct ceph_inode_info *ci = ceph_inode(inode);
309  u64 pos, this_len;
310  int io_align, page_align;
311  int left, pages_left;
312  int read;
313  struct page **page_pos;
314  int ret;
315  bool hit_stripe, was_short;
316 
317  /*
318  * we may need to do multiple reads. not atomic, unfortunately.
319  */
320  pos = off;
321  left = len;
322  page_pos = pages;
323  pages_left = num_pages;
324  read = 0;
325  io_align = off & ~PAGE_MASK;
326 
327 more:
328  if (o_direct)
329  page_align = (pos - io_align + buf_align) & ~PAGE_MASK;
330  else
331  page_align = pos & ~PAGE_MASK;
332  this_len = left;
333  ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
334  &ci->i_layout, pos, &this_len,
335  ci->i_truncate_seq,
336  ci->i_truncate_size,
337  page_pos, pages_left, page_align);
338  if (ret == -ENOENT)
339  ret = 0;
340  hit_stripe = this_len < left;
341  was_short = ret >= 0 && ret < this_len;
342  dout("striped_read %llu~%u (read %u) got %d%s%s\n", pos, left, read,
343  ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
344 
345  if (ret > 0) {
346  int didpages = (page_align + ret) >> PAGE_CACHE_SHIFT;
347 
348  if (read < pos - off) {
349  dout(" zero gap %llu to %llu\n", off + read, pos);
350  ceph_zero_page_vector_range(page_align + read,
351  pos - off - read, pages);
352  }
353  pos += ret;
354  read = pos - off;
355  left -= ret;
356  page_pos += didpages;
357  pages_left -= didpages;
358 
359  /* hit stripe? */
360  if (left && hit_stripe)
361  goto more;
362  }
363 
364  if (was_short) {
365  /* did we bounce off eof? */
366  if (pos + left > inode->i_size)
367  *checkeof = 1;
368 
369  /* zero trailing bytes (inside i_size) */
370  if (left > 0 && pos < inode->i_size) {
371  if (pos + left > inode->i_size)
372  left = inode->i_size - pos;
373 
374  dout("zero tail %d\n", left);
375  ceph_zero_page_vector_range(page_align + read, left,
376  pages);
377  read += left;
378  }
379  }
380 
381  if (ret >= 0)
382  ret = read;
383  dout("striped_read returns %d\n", ret);
384  return ret;
385 }
386 
387 /*
388  * Completely synchronous read and write methods. Direct from __user
389  * buffer to osd, or directly to user pages (if O_DIRECT).
390  *
391  * If the read spans object boundary, just do multiple reads.
392  */
393 static ssize_t ceph_sync_read(struct file *file, char __user *data,
394  unsigned len, loff_t *poff, int *checkeof)
395 {
396  struct inode *inode = file->f_dentry->d_inode;
397  struct page **pages;
398  u64 off = *poff;
399  int num_pages, ret;
400 
401  dout("sync_read on file %p %llu~%u %s\n", file, off, len,
402  (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
403 
404  if (file->f_flags & O_DIRECT) {
405  num_pages = calc_pages_for((unsigned long)data, len);
406  pages = ceph_get_direct_page_vector(data, num_pages, true);
407  } else {
408  num_pages = calc_pages_for(off, len);
409  pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
410  }
411  if (IS_ERR(pages))
412  return PTR_ERR(pages);
413 
414  /*
415  * flush any page cache pages in this range. this
416  * will make concurrent normal and sync io slow,
417  * but it will at least behave sensibly when they are
418  * in sequence.
419  */
420  ret = filemap_write_and_wait(inode->i_mapping);
421  if (ret < 0)
422  goto done;
423 
424  ret = striped_read(inode, off, len, pages, num_pages, checkeof,
425  file->f_flags & O_DIRECT,
426  (unsigned long)data & ~PAGE_MASK);
427 
428  if (ret >= 0 && (file->f_flags & O_DIRECT) == 0)
429  ret = ceph_copy_page_vector_to_user(pages, data, off, ret);
430  if (ret >= 0)
431  *poff = off + ret;
432 
433 done:
434  if (file->f_flags & O_DIRECT)
435  ceph_put_page_vector(pages, num_pages, true);
436  else
437  ceph_release_page_vector(pages, num_pages);
438  dout("sync_read result %d\n", ret);
439  return ret;
440 }
441 
442 /*
443  * Write commit callback, called if we requested both an ACK and
444  * ONDISK commit reply from the OSD.
445  */
446 static void sync_write_commit(struct ceph_osd_request *req,
447  struct ceph_msg *msg)
448 {
449  struct ceph_inode_info *ci = ceph_inode(req->r_inode);
450 
451  dout("sync_write_commit %p tid %llu\n", req, req->r_tid);
452  spin_lock(&ci->i_unsafe_lock);
453  list_del_init(&req->r_unsafe_item);
454  spin_unlock(&ci->i_unsafe_lock);
456 }
457 
458 /*
459  * Synchronous write, straight from __user pointer or user pages (if
460  * O_DIRECT).
461  *
462  * If write spans object boundary, just do multiple writes. (For a
463  * correct atomic write, we should e.g. take write locks on all
464  * objects, rollback on failure, etc.)
465  */
466 static ssize_t ceph_sync_write(struct file *file, const char __user *data,
467  size_t left, loff_t *offset)
468 {
469  struct inode *inode = file->f_dentry->d_inode;
470  struct ceph_inode_info *ci = ceph_inode(inode);
471  struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
472  struct ceph_osd_request *req;
473  struct page **pages;
474  int num_pages;
475  long long unsigned pos;
476  u64 len;
477  int written = 0;
478  int flags;
479  int do_sync = 0;
480  int check_caps = 0;
481  int page_align, io_align;
482  unsigned long buf_align;
483  int ret;
484  struct timespec mtime = CURRENT_TIME;
485 
486  if (ceph_snap(file->f_dentry->d_inode) != CEPH_NOSNAP)
487  return -EROFS;
488 
489  dout("sync_write on file %p %lld~%u %s\n", file, *offset,
490  (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
491 
492  if (file->f_flags & O_APPEND)
493  pos = i_size_read(inode);
494  else
495  pos = *offset;
496 
497  ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left);
498  if (ret < 0)
499  return ret;
500 
502  pos >> PAGE_CACHE_SHIFT,
503  (pos + left) >> PAGE_CACHE_SHIFT);
504  if (ret < 0)
505  dout("invalidate_inode_pages2_range returned %d\n", ret);
506 
507  flags = CEPH_OSD_FLAG_ORDERSNAP |
510  if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
511  flags |= CEPH_OSD_FLAG_ACK;
512  else
513  do_sync = 1;
514 
515  /*
516  * we may need to do multiple writes here if we span an object
517  * boundary. this isn't atomic, unfortunately. :(
518  */
519 more:
520  io_align = pos & ~PAGE_MASK;
521  buf_align = (unsigned long)data & ~PAGE_MASK;
522  len = left;
523  if (file->f_flags & O_DIRECT) {
524  /* write from beginning of first page, regardless of
525  io alignment */
526  page_align = (pos - io_align + buf_align) & ~PAGE_MASK;
527  num_pages = calc_pages_for((unsigned long)data, len);
528  } else {
529  page_align = pos & ~PAGE_MASK;
530  num_pages = calc_pages_for(pos, len);
531  }
532  req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
533  ceph_vino(inode), pos, &len,
534  CEPH_OSD_OP_WRITE, flags,
535  ci->i_snap_realm->cached_context,
536  do_sync,
538  &mtime, false, 2, page_align);
539  if (IS_ERR(req))
540  return PTR_ERR(req);
541 
542  if (file->f_flags & O_DIRECT) {
543  pages = ceph_get_direct_page_vector(data, num_pages, false);
544  if (IS_ERR(pages)) {
545  ret = PTR_ERR(pages);
546  goto out;
547  }
548 
549  /*
550  * throw out any page cache pages in this range. this
551  * may block.
552  */
554  (pos+len) | (PAGE_CACHE_SIZE-1));
555  } else {
556  pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
557  if (IS_ERR(pages)) {
558  ret = PTR_ERR(pages);
559  goto out;
560  }
561  ret = ceph_copy_user_to_page_vector(pages, data, pos, len);
562  if (ret < 0) {
563  ceph_release_page_vector(pages, num_pages);
564  goto out;
565  }
566 
567  if ((file->f_flags & O_SYNC) == 0) {
568  /* get a second commit callback */
569  req->r_safe_callback = sync_write_commit;
570  req->r_own_pages = 1;
571  }
572  }
573  req->r_pages = pages;
574  req->r_num_pages = num_pages;
575  req->r_inode = inode;
576 
577  ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
578  if (!ret) {
579  if (req->r_safe_callback) {
580  /*
581  * Add to inode unsafe list only after we
582  * start_request so that a tid has been assigned.
583  */
584  spin_lock(&ci->i_unsafe_lock);
586  &ci->i_unsafe_writes);
587  spin_unlock(&ci->i_unsafe_lock);
589  }
590 
591  ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
592  if (ret < 0 && req->r_safe_callback) {
593  spin_lock(&ci->i_unsafe_lock);
594  list_del_init(&req->r_unsafe_item);
595  spin_unlock(&ci->i_unsafe_lock);
597  }
598  }
599 
600  if (file->f_flags & O_DIRECT)
601  ceph_put_page_vector(pages, num_pages, false);
602  else if (file->f_flags & O_SYNC)
603  ceph_release_page_vector(pages, num_pages);
604 
605 out:
606  ceph_osdc_put_request(req);
607  if (ret == 0) {
608  pos += len;
609  written += len;
610  left -= len;
611  data += written;
612  if (left)
613  goto more;
614 
615  ret = written;
616  *offset = pos;
617  if (pos > i_size_read(inode))
618  check_caps = ceph_inode_set_size(inode, pos);
619  if (check_caps)
620  ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY,
621  NULL);
622  }
623  return ret;
624 }
625 
626 /*
627  * Wrap generic_file_aio_read with checks for cap bits on the inode.
628  * Atomically grab references, so that those bits are not released
629  * back to the MDS mid-read.
630  *
631  * Hmm, the sync read case isn't actually async... should it be?
632  */
633 static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov,
634  unsigned long nr_segs, loff_t pos)
635 {
636  struct file *filp = iocb->ki_filp;
637  struct ceph_file_info *fi = filp->private_data;
638  loff_t *ppos = &iocb->ki_pos;
639  size_t len = iov->iov_len;
640  struct inode *inode = filp->f_dentry->d_inode;
641  struct ceph_inode_info *ci = ceph_inode(inode);
642  void __user *base = iov->iov_base;
643  ssize_t ret;
644  int want, got = 0;
645  int checkeof = 0, read = 0;
646 
647  dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
648  inode, ceph_vinop(inode), pos, (unsigned)len, inode);
649 again:
651  if (fi->fmode & CEPH_FILE_MODE_LAZY)
653  else
654  want = CEPH_CAP_FILE_CACHE;
655  ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1);
656  if (ret < 0)
657  goto out;
658  dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
659  inode, ceph_vinop(inode), pos, (unsigned)len,
660  ceph_cap_string(got));
661 
662  if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
663  (iocb->ki_filp->f_flags & O_DIRECT) ||
664  (inode->i_sb->s_flags & MS_SYNCHRONOUS) ||
665  (fi->flags & CEPH_F_SYNC))
666  /* hmm, this isn't really async... */
667  ret = ceph_sync_read(filp, base, len, ppos, &checkeof);
668  else
669  ret = generic_file_aio_read(iocb, iov, nr_segs, pos);
670 
671 out:
672  dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
673  inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
674  ceph_put_cap_refs(ci, got);
675 
676  if (checkeof && ret >= 0) {
677  int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE);
678 
679  /* hit EOF or hole? */
680  if (statret == 0 && *ppos < inode->i_size) {
681  dout("aio_read sync_read hit hole, ppos %lld < size %lld, reading more\n", *ppos, inode->i_size);
682  read += ret;
683  base += ret;
684  len -= ret;
685  checkeof = 0;
686  goto again;
687  }
688  }
689  if (ret >= 0)
690  ret += read;
691 
692  return ret;
693 }
694 
695 /*
696  * Take cap references to avoid releasing caps to MDS mid-write.
697  *
698  * If we are synchronous, and write with an old snap context, the OSD
699  * may return EOLDSNAPC. In that case, retry the write.. _after_
700  * dropping our cap refs and allowing the pending snap to logically
701  * complete _before_ this write occurs.
702  *
703  * If we are near ENOSPC, write synchronously.
704  */
705 static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
706  unsigned long nr_segs, loff_t pos)
707 {
708  struct file *file = iocb->ki_filp;
709  struct ceph_file_info *fi = file->private_data;
710  struct inode *inode = file->f_dentry->d_inode;
711  struct ceph_inode_info *ci = ceph_inode(inode);
712  struct ceph_osd_client *osdc =
713  &ceph_sb_to_client(inode->i_sb)->client->osdc;
714  loff_t endoff = pos + iov->iov_len;
715  int want, got = 0;
716  int ret, err;
717 
718  if (ceph_snap(inode) != CEPH_NOSNAP)
719  return -EROFS;
720 
721 retry_snap:
722  if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
723  return -ENOSPC;
725  dout("aio_write %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
726  inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
727  inode->i_size);
728  if (fi->fmode & CEPH_FILE_MODE_LAZY)
730  else
731  want = CEPH_CAP_FILE_BUFFER;
732  ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff);
733  if (ret < 0)
734  goto out_put;
735 
736  dout("aio_write %p %llx.%llx %llu~%u got cap refs on %s\n",
737  inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
738  ceph_cap_string(got));
739 
740  if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
741  (iocb->ki_filp->f_flags & O_DIRECT) ||
742  (inode->i_sb->s_flags & MS_SYNCHRONOUS) ||
743  (fi->flags & CEPH_F_SYNC)) {
744  ret = ceph_sync_write(file, iov->iov_base, iov->iov_len,
745  &iocb->ki_pos);
746  } else {
747  /*
748  * buffered write; drop Fw early to avoid slow
749  * revocation if we get stuck on balance_dirty_pages
750  */
751  int dirty;
752 
753  spin_lock(&ci->i_ceph_lock);
755  spin_unlock(&ci->i_ceph_lock);
756  ceph_put_cap_refs(ci, got);
757 
758  ret = generic_file_aio_write(iocb, iov, nr_segs, pos);
759  if ((ret >= 0 || ret == -EIOCBQUEUED) &&
760  ((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host)
761  || ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) {
762  err = vfs_fsync_range(file, pos, pos + ret - 1, 1);
763  if (err < 0)
764  ret = err;
765  }
766 
767  if (dirty)
768  __mark_inode_dirty(inode, dirty);
769  goto out;
770  }
771 
772  if (ret >= 0) {
773  int dirty;
774  spin_lock(&ci->i_ceph_lock);
776  spin_unlock(&ci->i_ceph_lock);
777  if (dirty)
778  __mark_inode_dirty(inode, dirty);
779  }
780 
781 out_put:
782  dout("aio_write %p %llx.%llx %llu~%u dropping cap refs on %s\n",
783  inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
784  ceph_cap_string(got));
785  ceph_put_cap_refs(ci, got);
786 
787 out:
788  if (ret == -EOLDSNAPC) {
789  dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n",
790  inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len);
791  goto retry_snap;
792  }
793 
794  return ret;
795 }
796 
797 /*
798  * llseek. be sure to verify file size on SEEK_END.
799  */
800 static loff_t ceph_llseek(struct file *file, loff_t offset, int origin)
801 {
802  struct inode *inode = file->f_mapping->host;
803  int ret;
804 
805  mutex_lock(&inode->i_mutex);
807 
808  if (origin == SEEK_END || origin == SEEK_DATA || origin == SEEK_HOLE) {
809  ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE);
810  if (ret < 0) {
811  offset = ret;
812  goto out;
813  }
814  }
815 
816  switch (origin) {
817  case SEEK_END:
818  offset += inode->i_size;
819  break;
820  case SEEK_CUR:
821  /*
822  * Here we special-case the lseek(fd, 0, SEEK_CUR)
823  * position-querying operation. Avoid rewriting the "same"
824  * f_pos value back to the file because a concurrent read(),
825  * write() or lseek() might have altered it
826  */
827  if (offset == 0) {
828  offset = file->f_pos;
829  goto out;
830  }
831  offset += file->f_pos;
832  break;
833  case SEEK_DATA:
834  if (offset >= inode->i_size) {
835  ret = -ENXIO;
836  goto out;
837  }
838  break;
839  case SEEK_HOLE:
840  if (offset >= inode->i_size) {
841  ret = -ENXIO;
842  goto out;
843  }
844  offset = inode->i_size;
845  break;
846  }
847 
848  if (offset < 0 || offset > inode->i_sb->s_maxbytes) {
849  offset = -EINVAL;
850  goto out;
851  }
852 
853  /* Special lock needed here? */
854  if (offset != file->f_pos) {
855  file->f_pos = offset;
856  file->f_version = 0;
857  }
858 
859 out:
860  mutex_unlock(&inode->i_mutex);
861  return offset;
862 }
863 
865  .open = ceph_open,
866  .release = ceph_release,
867  .llseek = ceph_llseek,
868  .read = do_sync_read,
869  .write = do_sync_write,
870  .aio_read = ceph_aio_read,
871  .aio_write = ceph_aio_write,
872  .mmap = ceph_mmap,
873  .fsync = ceph_fsync,
874  .lock = ceph_lock,
875  .flock = ceph_flock,
876  .splice_read = generic_file_splice_read,
877  .splice_write = generic_file_splice_write,
878  .unlocked_ioctl = ceph_ioctl,
879  .compat_ioctl = ceph_ioctl,
880 };
881