Linux Kernel  3.7.1
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
dm-kcopyd.c
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2002 Sistina Software (UK) Limited.
3  * Copyright (C) 2006 Red Hat GmbH
4  *
5  * This file is released under the GPL.
6  *
7  * Kcopyd provides a simple interface for copying an area of one
8  * block-device to one or more other block-devices, with an asynchronous
9  * completion notification.
10  */
11 
12 #include <linux/types.h>
13 #include <linux/atomic.h>
14 #include <linux/blkdev.h>
15 #include <linux/fs.h>
16 #include <linux/init.h>
17 #include <linux/list.h>
18 #include <linux/mempool.h>
19 #include <linux/module.h>
20 #include <linux/pagemap.h>
21 #include <linux/slab.h>
22 #include <linux/vmalloc.h>
23 #include <linux/workqueue.h>
24 #include <linux/mutex.h>
25 #include <linux/device-mapper.h>
26 #include <linux/dm-kcopyd.h>
27 
28 #include "dm.h"
29 
30 #define SUB_JOB_SIZE 128
31 #define SPLIT_COUNT 8
32 #define MIN_JOBS 8
33 #define RESERVE_PAGES (DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE))
34 
35 /*-----------------------------------------------------------------
36  * Each kcopyd client has its own little pool of preallocated
37  * pages for kcopyd io.
38  *---------------------------------------------------------------*/
40  struct page_list *pages;
42  unsigned nr_free_pages;
43 
45 
48 
50 
53 
54 /*
55  * We maintain three lists of jobs:
56  *
57  * i) jobs waiting for pages
58  * ii) jobs that have pages, and are waiting for the io to be issued.
59  * iii) jobs that have completed.
60  *
61  * All three of these are protected by job_lock.
62  */
67 };
68 
69 static struct page_list zero_page_list;
70 
71 static void wake(struct dm_kcopyd_client *kc)
72 {
73  queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
74 }
75 
76 /*
77  * Obtain one page for the use of kcopyd.
78  */
79 static struct page_list *alloc_pl(gfp_t gfp)
80 {
81  struct page_list *pl;
82 
83  pl = kmalloc(sizeof(*pl), gfp);
84  if (!pl)
85  return NULL;
86 
87  pl->page = alloc_page(gfp);
88  if (!pl->page) {
89  kfree(pl);
90  return NULL;
91  }
92 
93  return pl;
94 }
95 
96 static void free_pl(struct page_list *pl)
97 {
98  __free_page(pl->page);
99  kfree(pl);
100 }
101 
102 /*
103  * Add the provided pages to a client's free page list, releasing
104  * back to the system any beyond the reserved_pages limit.
105  */
106 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
107 {
108  struct page_list *next;
109 
110  do {
111  next = pl->next;
112 
113  if (kc->nr_free_pages >= kc->nr_reserved_pages)
114  free_pl(pl);
115  else {
116  pl->next = kc->pages;
117  kc->pages = pl;
118  kc->nr_free_pages++;
119  }
120 
121  pl = next;
122  } while (pl);
123 }
124 
125 static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
126  unsigned int nr, struct page_list **pages)
127 {
128  struct page_list *pl;
129 
130  *pages = NULL;
131 
132  do {
133  pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY);
134  if (unlikely(!pl)) {
135  /* Use reserved pages */
136  pl = kc->pages;
137  if (unlikely(!pl))
138  goto out_of_memory;
139  kc->pages = pl->next;
140  kc->nr_free_pages--;
141  }
142  pl->next = *pages;
143  *pages = pl;
144  } while (--nr);
145 
146  return 0;
147 
149  if (*pages)
150  kcopyd_put_pages(kc, *pages);
151  return -ENOMEM;
152 }
153 
154 /*
155  * These three functions resize the page pool.
156  */
157 static void drop_pages(struct page_list *pl)
158 {
159  struct page_list *next;
160 
161  while (pl) {
162  next = pl->next;
163  free_pl(pl);
164  pl = next;
165  }
166 }
167 
168 /*
169  * Allocate and reserve nr_pages for the use of a specific client.
170  */
171 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages)
172 {
173  unsigned i;
174  struct page_list *pl = NULL, *next;
175 
176  for (i = 0; i < nr_pages; i++) {
177  next = alloc_pl(GFP_KERNEL);
178  if (!next) {
179  if (pl)
180  drop_pages(pl);
181  return -ENOMEM;
182  }
183  next->next = pl;
184  pl = next;
185  }
186 
187  kc->nr_reserved_pages += nr_pages;
188  kcopyd_put_pages(kc, pl);
189 
190  return 0;
191 }
192 
193 static void client_free_pages(struct dm_kcopyd_client *kc)
194 {
196  drop_pages(kc->pages);
197  kc->pages = NULL;
198  kc->nr_free_pages = kc->nr_reserved_pages = 0;
199 }
200 
201 /*-----------------------------------------------------------------
202  * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
203  * for this reason we use a mempool to prevent the client from
204  * ever having to do io (which could cause a deadlock).
205  *---------------------------------------------------------------*/
206 struct kcopyd_job {
208  struct list_head list;
209  unsigned long flags;
210 
211  /*
212  * Error state of the job.
213  */
214  int read_err;
215  unsigned long write_err;
216 
217  /*
218  * Either READ or WRITE
219  */
220  int rw;
221  struct dm_io_region source;
222 
223  /*
224  * The destinations for the transfer.
225  */
226  unsigned int num_dests;
227  struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
228 
229  struct page_list *pages;
230 
231  /*
232  * Set this to ensure you are notified when the job has
233  * completed. 'context' is for callback to use.
234  */
235  dm_kcopyd_notify_fn fn;
236  void *context;
237 
238  /*
239  * These fields are only used if the job has been split
240  * into more manageable parts.
241  */
242  struct mutex lock;
245 
247 };
248 
249 static struct kmem_cache *_job_cache;
250 
252 {
253  _job_cache = kmem_cache_create("kcopyd_job",
254  sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1),
255  __alignof__(struct kcopyd_job), 0, NULL);
256  if (!_job_cache)
257  return -ENOMEM;
258 
259  zero_page_list.next = &zero_page_list;
260  zero_page_list.page = ZERO_PAGE(0);
261 
262  return 0;
263 }
264 
265 void dm_kcopyd_exit(void)
266 {
267  kmem_cache_destroy(_job_cache);
268  _job_cache = NULL;
269 }
270 
271 /*
272  * Functions to push and pop a job onto the head of a given job
273  * list.
274  */
275 static struct kcopyd_job *pop(struct list_head *jobs,
276  struct dm_kcopyd_client *kc)
277 {
278  struct kcopyd_job *job = NULL;
279  unsigned long flags;
280 
281  spin_lock_irqsave(&kc->job_lock, flags);
282 
283  if (!list_empty(jobs)) {
284  job = list_entry(jobs->next, struct kcopyd_job, list);
285  list_del(&job->list);
286  }
287  spin_unlock_irqrestore(&kc->job_lock, flags);
288 
289  return job;
290 }
291 
292 static void push(struct list_head *jobs, struct kcopyd_job *job)
293 {
294  unsigned long flags;
295  struct dm_kcopyd_client *kc = job->kc;
296 
297  spin_lock_irqsave(&kc->job_lock, flags);
298  list_add_tail(&job->list, jobs);
299  spin_unlock_irqrestore(&kc->job_lock, flags);
300 }
301 
302 
303 static void push_head(struct list_head *jobs, struct kcopyd_job *job)
304 {
305  unsigned long flags;
306  struct dm_kcopyd_client *kc = job->kc;
307 
308  spin_lock_irqsave(&kc->job_lock, flags);
309  list_add(&job->list, jobs);
310  spin_unlock_irqrestore(&kc->job_lock, flags);
311 }
312 
313 /*
314  * These three functions process 1 item from the corresponding
315  * job list.
316  *
317  * They return:
318  * < 0: error
319  * 0: success
320  * > 0: can't process yet.
321  */
322 static int run_complete_job(struct kcopyd_job *job)
323 {
324  void *context = job->context;
325  int read_err = job->read_err;
326  unsigned long write_err = job->write_err;
327  dm_kcopyd_notify_fn fn = job->fn;
328  struct dm_kcopyd_client *kc = job->kc;
329 
330  if (job->pages && job->pages != &zero_page_list)
331  kcopyd_put_pages(kc, job->pages);
332  /*
333  * If this is the master job, the sub jobs have already
334  * completed so we can free everything.
335  */
336  if (job->master_job == job)
337  mempool_free(job, kc->job_pool);
338  fn(read_err, write_err, context);
339 
340  if (atomic_dec_and_test(&kc->nr_jobs))
341  wake_up(&kc->destroyq);
342 
343  return 0;
344 }
345 
346 static void complete_io(unsigned long error, void *context)
347 {
348  struct kcopyd_job *job = (struct kcopyd_job *) context;
349  struct dm_kcopyd_client *kc = job->kc;
350 
351  if (error) {
352  if (job->rw == WRITE)
353  job->write_err |= error;
354  else
355  job->read_err = 1;
356 
357  if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
358  push(&kc->complete_jobs, job);
359  wake(kc);
360  return;
361  }
362  }
363 
364  if (job->rw == WRITE)
365  push(&kc->complete_jobs, job);
366 
367  else {
368  job->rw = WRITE;
369  push(&kc->io_jobs, job);
370  }
371 
372  wake(kc);
373 }
374 
375 /*
376  * Request io on as many buffer heads as we can currently get for
377  * a particular job.
378  */
379 static int run_io_job(struct kcopyd_job *job)
380 {
381  int r;
382  struct dm_io_request io_req = {
383  .bi_rw = job->rw,
384  .mem.type = DM_IO_PAGE_LIST,
385  .mem.ptr.pl = job->pages,
386  .mem.offset = 0,
387  .notify.fn = complete_io,
388  .notify.context = job,
389  .client = job->kc->io_client,
390  };
391 
392  if (job->rw == READ)
393  r = dm_io(&io_req, 1, &job->source, NULL);
394  else
395  r = dm_io(&io_req, job->num_dests, job->dests, NULL);
396 
397  return r;
398 }
399 
400 static int run_pages_job(struct kcopyd_job *job)
401 {
402  int r;
403  unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9);
404 
405  r = kcopyd_get_pages(job->kc, nr_pages, &job->pages);
406  if (!r) {
407  /* this job is ready for io */
408  push(&job->kc->io_jobs, job);
409  return 0;
410  }
411 
412  if (r == -ENOMEM)
413  /* can't complete now */
414  return 1;
415 
416  return r;
417 }
418 
419 /*
420  * Run through a list for as long as possible. Returns the count
421  * of successful jobs.
422  */
423 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
424  int (*fn) (struct kcopyd_job *))
425 {
426  struct kcopyd_job *job;
427  int r, count = 0;
428 
429  while ((job = pop(jobs, kc))) {
430 
431  r = fn(job);
432 
433  if (r < 0) {
434  /* error this rogue job */
435  if (job->rw == WRITE)
436  job->write_err = (unsigned long) -1L;
437  else
438  job->read_err = 1;
439  push(&kc->complete_jobs, job);
440  break;
441  }
442 
443  if (r > 0) {
444  /*
445  * We couldn't service this job ATM, so
446  * push this job back onto the list.
447  */
448  push_head(jobs, job);
449  break;
450  }
451 
452  count++;
453  }
454 
455  return count;
456 }
457 
458 /*
459  * kcopyd does this every time it's woken up.
460  */
461 static void do_work(struct work_struct *work)
462 {
463  struct dm_kcopyd_client *kc = container_of(work,
464  struct dm_kcopyd_client, kcopyd_work);
465  struct blk_plug plug;
466 
467  /*
468  * The order that these are called is *very* important.
469  * complete jobs can free some pages for pages jobs.
470  * Pages jobs when successful will jump onto the io jobs
471  * list. io jobs call wake when they complete and it all
472  * starts again.
473  */
474  blk_start_plug(&plug);
475  process_jobs(&kc->complete_jobs, kc, run_complete_job);
476  process_jobs(&kc->pages_jobs, kc, run_pages_job);
477  process_jobs(&kc->io_jobs, kc, run_io_job);
478  blk_finish_plug(&plug);
479 }
480 
481 /*
482  * If we are copying a small region we just dispatch a single job
483  * to do the copy, otherwise the io has to be split up into many
484  * jobs.
485  */
486 static void dispatch_job(struct kcopyd_job *job)
487 {
488  struct dm_kcopyd_client *kc = job->kc;
489  atomic_inc(&kc->nr_jobs);
490  if (unlikely(!job->source.count))
491  push(&kc->complete_jobs, job);
492  else if (job->pages == &zero_page_list)
493  push(&kc->io_jobs, job);
494  else
495  push(&kc->pages_jobs, job);
496  wake(kc);
497 }
498 
499 static void segment_complete(int read_err, unsigned long write_err,
500  void *context)
501 {
502  /* FIXME: tidy this function */
503  sector_t progress = 0;
504  sector_t count = 0;
505  struct kcopyd_job *sub_job = (struct kcopyd_job *) context;
506  struct kcopyd_job *job = sub_job->master_job;
507  struct dm_kcopyd_client *kc = job->kc;
508 
509  mutex_lock(&job->lock);
510 
511  /* update the error */
512  if (read_err)
513  job->read_err = 1;
514 
515  if (write_err)
516  job->write_err |= write_err;
517 
518  /*
519  * Only dispatch more work if there hasn't been an error.
520  */
521  if ((!job->read_err && !job->write_err) ||
522  test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
523  /* get the next chunk of work */
524  progress = job->progress;
525  count = job->source.count - progress;
526  if (count) {
527  if (count > SUB_JOB_SIZE)
528  count = SUB_JOB_SIZE;
529 
530  job->progress += count;
531  }
532  }
533  mutex_unlock(&job->lock);
534 
535  if (count) {
536  int i;
537 
538  *sub_job = *job;
539  sub_job->source.sector += progress;
540  sub_job->source.count = count;
541 
542  for (i = 0; i < job->num_dests; i++) {
543  sub_job->dests[i].sector += progress;
544  sub_job->dests[i].count = count;
545  }
546 
547  sub_job->fn = segment_complete;
548  sub_job->context = sub_job;
549  dispatch_job(sub_job);
550 
551  } else if (atomic_dec_and_test(&job->sub_jobs)) {
552 
553  /*
554  * Queue the completion callback to the kcopyd thread.
555  *
556  * Some callers assume that all the completions are called
557  * from a single thread and don't race with each other.
558  *
559  * We must not call the callback directly here because this
560  * code may not be executing in the thread.
561  */
562  push(&kc->complete_jobs, job);
563  wake(kc);
564  }
565 }
566 
567 /*
568  * Create some sub jobs to share the work between them.
569  */
570 static void split_job(struct kcopyd_job *master_job)
571 {
572  int i;
573 
574  atomic_inc(&master_job->kc->nr_jobs);
575 
576  atomic_set(&master_job->sub_jobs, SPLIT_COUNT);
577  for (i = 0; i < SPLIT_COUNT; i++) {
578  master_job[i + 1].master_job = master_job;
579  segment_complete(0, 0u, &master_job[i + 1]);
580  }
581 }
582 
583 int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
584  unsigned int num_dests, struct dm_io_region *dests,
585  unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
586 {
587  struct kcopyd_job *job;
588 
589  /*
590  * Allocate an array of jobs consisting of one master job
591  * followed by SPLIT_COUNT sub jobs.
592  */
593  job = mempool_alloc(kc->job_pool, GFP_NOIO);
594 
595  /*
596  * set up for the read.
597  */
598  job->kc = kc;
599  job->flags = flags;
600  job->read_err = 0;
601  job->write_err = 0;
602 
603  job->num_dests = num_dests;
604  memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
605 
606  if (from) {
607  job->source = *from;
608  job->pages = NULL;
609  job->rw = READ;
610  } else {
611  memset(&job->source, 0, sizeof job->source);
612  job->source.count = job->dests[0].count;
613  job->pages = &zero_page_list;
614  job->rw = WRITE;
615  }
616 
617  job->fn = fn;
618  job->context = context;
619  job->master_job = job;
620 
621  if (job->source.count <= SUB_JOB_SIZE)
622  dispatch_job(job);
623  else {
624  mutex_init(&job->lock);
625  job->progress = 0;
626  split_job(job);
627  }
628 
629  return 0;
630 }
632 
634  unsigned num_dests, struct dm_io_region *dests,
635  unsigned flags, dm_kcopyd_notify_fn fn, void *context)
636 {
637  return dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context);
638 }
640 
642  dm_kcopyd_notify_fn fn, void *context)
643 {
644  struct kcopyd_job *job;
645 
646  job = mempool_alloc(kc->job_pool, GFP_NOIO);
647 
648  memset(job, 0, sizeof(struct kcopyd_job));
649  job->kc = kc;
650  job->fn = fn;
651  job->context = context;
652  job->master_job = job;
653 
654  atomic_inc(&kc->nr_jobs);
655 
656  return job;
657 }
659 
660 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err)
661 {
662  struct kcopyd_job *job = j;
663  struct dm_kcopyd_client *kc = job->kc;
664 
665  job->read_err = read_err;
666  job->write_err = write_err;
667 
668  push(&kc->complete_jobs, job);
669  wake(kc);
670 }
672 
673 /*
674  * Cancels a kcopyd job, eg. someone might be deactivating a
675  * mirror.
676  */
677 #if 0
678 int kcopyd_cancel(struct kcopyd_job *job, int block)
679 {
680  /* FIXME: finish */
681  return -1;
682 }
683 #endif /* 0 */
684 
685 /*-----------------------------------------------------------------
686  * Client setup
687  *---------------------------------------------------------------*/
689 {
690  int r = -ENOMEM;
691  struct dm_kcopyd_client *kc;
692 
693  kc = kmalloc(sizeof(*kc), GFP_KERNEL);
694  if (!kc)
695  return ERR_PTR(-ENOMEM);
696 
697  spin_lock_init(&kc->job_lock);
698  INIT_LIST_HEAD(&kc->complete_jobs);
699  INIT_LIST_HEAD(&kc->io_jobs);
700  INIT_LIST_HEAD(&kc->pages_jobs);
701 
702  kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache);
703  if (!kc->job_pool)
704  goto bad_slab;
705 
706  INIT_WORK(&kc->kcopyd_work, do_work);
707  kc->kcopyd_wq = alloc_workqueue("kcopyd",
709  if (!kc->kcopyd_wq)
710  goto bad_workqueue;
711 
712  kc->pages = NULL;
713  kc->nr_reserved_pages = kc->nr_free_pages = 0;
714  r = client_reserve_pages(kc, RESERVE_PAGES);
715  if (r)
716  goto bad_client_pages;
717 
719  if (IS_ERR(kc->io_client)) {
720  r = PTR_ERR(kc->io_client);
721  goto bad_io_client;
722  }
723 
725  atomic_set(&kc->nr_jobs, 0);
726 
727  return kc;
728 
729 bad_io_client:
730  client_free_pages(kc);
731 bad_client_pages:
733 bad_workqueue:
735 bad_slab:
736  kfree(kc);
737 
738  return ERR_PTR(r);
739 }
741 
743 {
744  /* Wait for completion of all jobs submitted by this client. */
745  wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
746 
747  BUG_ON(!list_empty(&kc->complete_jobs));
748  BUG_ON(!list_empty(&kc->io_jobs));
749  BUG_ON(!list_empty(&kc->pages_jobs));
752  client_free_pages(kc);
754  kfree(kc);
755 }