GNU Linux-libre 4.9.318-gnu1
[releases.git] / drivers / md / dm-kcopyd.c
1 /*
2  * Copyright (C) 2002 Sistina Software (UK) Limited.
3  * Copyright (C) 2006 Red Hat GmbH
4  *
5  * This file is released under the GPL.
6  *
7  * Kcopyd provides a simple interface for copying an area of one
8  * block-device to one or more other block-devices, with an asynchronous
9  * completion notification.
10  */
11
12 #include <linux/types.h>
13 #include <linux/atomic.h>
14 #include <linux/blkdev.h>
15 #include <linux/fs.h>
16 #include <linux/init.h>
17 #include <linux/list.h>
18 #include <linux/mempool.h>
19 #include <linux/module.h>
20 #include <linux/pagemap.h>
21 #include <linux/slab.h>
22 #include <linux/vmalloc.h>
23 #include <linux/workqueue.h>
24 #include <linux/mutex.h>
25 #include <linux/delay.h>
26 #include <linux/device-mapper.h>
27 #include <linux/dm-kcopyd.h>
28
29 #include "dm-core.h"
30
31 #define SUB_JOB_SIZE    128
32 #define SPLIT_COUNT     8
33 #define MIN_JOBS        8
34 #define RESERVE_PAGES   (DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE))
35
36 /*-----------------------------------------------------------------
37  * Each kcopyd client has its own little pool of preallocated
38  * pages for kcopyd io.
39  *---------------------------------------------------------------*/
40 struct dm_kcopyd_client {
41         struct page_list *pages;
42         unsigned nr_reserved_pages;
43         unsigned nr_free_pages;
44
45         struct dm_io_client *io_client;
46
47         wait_queue_head_t destroyq;
48         atomic_t nr_jobs;
49
50         mempool_t *job_pool;
51
52         struct workqueue_struct *kcopyd_wq;
53         struct work_struct kcopyd_work;
54
55         struct dm_kcopyd_throttle *throttle;
56
57 /*
58  * We maintain four lists of jobs:
59  *
60  * i)   jobs waiting for pages
61  * ii)  jobs that have pages, and are waiting for the io to be issued.
62  * iii) jobs that don't need to do any IO and just run a callback
63  * iv) jobs that have completed.
64  *
65  * All four of these are protected by job_lock.
66  */
67         spinlock_t job_lock;
68         struct list_head callback_jobs;
69         struct list_head complete_jobs;
70         struct list_head io_jobs;
71         struct list_head pages_jobs;
72 };
73
74 static struct page_list zero_page_list;
75
76 static DEFINE_SPINLOCK(throttle_spinlock);
77
78 /*
79  * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period.
80  * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided
81  * by 2.
82  */
83 #define ACCOUNT_INTERVAL_SHIFT          SHIFT_HZ
84
85 /*
86  * Sleep this number of milliseconds.
87  *
88  * The value was decided experimentally.
89  * Smaller values seem to cause an increased copy rate above the limit.
90  * The reason for this is unknown but possibly due to jiffies rounding errors
91  * or read/write cache inside the disk.
92  */
93 #define SLEEP_MSEC                      100
94
95 /*
96  * Maximum number of sleep events. There is a theoretical livelock if more
97  * kcopyd clients do work simultaneously which this limit avoids.
98  */
99 #define MAX_SLEEPS                      10
100
101 static void io_job_start(struct dm_kcopyd_throttle *t)
102 {
103         unsigned throttle, now, difference;
104         int slept = 0, skew;
105
106         if (unlikely(!t))
107                 return;
108
109 try_again:
110         spin_lock_irq(&throttle_spinlock);
111
112         throttle = ACCESS_ONCE(t->throttle);
113
114         if (likely(throttle >= 100))
115                 goto skip_limit;
116
117         now = jiffies;
118         difference = now - t->last_jiffies;
119         t->last_jiffies = now;
120         if (t->num_io_jobs)
121                 t->io_period += difference;
122         t->total_period += difference;
123
124         /*
125          * Maintain sane values if we got a temporary overflow.
126          */
127         if (unlikely(t->io_period > t->total_period))
128                 t->io_period = t->total_period;
129
130         if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) {
131                 int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT);
132                 t->total_period >>= shift;
133                 t->io_period >>= shift;
134         }
135
136         skew = t->io_period - throttle * t->total_period / 100;
137
138         if (unlikely(skew > 0) && slept < MAX_SLEEPS) {
139                 slept++;
140                 spin_unlock_irq(&throttle_spinlock);
141                 msleep(SLEEP_MSEC);
142                 goto try_again;
143         }
144
145 skip_limit:
146         t->num_io_jobs++;
147
148         spin_unlock_irq(&throttle_spinlock);
149 }
150
151 static void io_job_finish(struct dm_kcopyd_throttle *t)
152 {
153         unsigned long flags;
154
155         if (unlikely(!t))
156                 return;
157
158         spin_lock_irqsave(&throttle_spinlock, flags);
159
160         t->num_io_jobs--;
161
162         if (likely(ACCESS_ONCE(t->throttle) >= 100))
163                 goto skip_limit;
164
165         if (!t->num_io_jobs) {
166                 unsigned now, difference;
167
168                 now = jiffies;
169                 difference = now - t->last_jiffies;
170                 t->last_jiffies = now;
171
172                 t->io_period += difference;
173                 t->total_period += difference;
174
175                 /*
176                  * Maintain sane values if we got a temporary overflow.
177                  */
178                 if (unlikely(t->io_period > t->total_period))
179                         t->io_period = t->total_period;
180         }
181
182 skip_limit:
183         spin_unlock_irqrestore(&throttle_spinlock, flags);
184 }
185
186
187 static void wake(struct dm_kcopyd_client *kc)
188 {
189         queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
190 }
191
192 /*
193  * Obtain one page for the use of kcopyd.
194  */
195 static struct page_list *alloc_pl(gfp_t gfp)
196 {
197         struct page_list *pl;
198
199         pl = kmalloc(sizeof(*pl), gfp);
200         if (!pl)
201                 return NULL;
202
203         pl->page = alloc_page(gfp);
204         if (!pl->page) {
205                 kfree(pl);
206                 return NULL;
207         }
208
209         return pl;
210 }
211
212 static void free_pl(struct page_list *pl)
213 {
214         __free_page(pl->page);
215         kfree(pl);
216 }
217
218 /*
219  * Add the provided pages to a client's free page list, releasing
220  * back to the system any beyond the reserved_pages limit.
221  */
222 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
223 {
224         struct page_list *next;
225
226         do {
227                 next = pl->next;
228
229                 if (kc->nr_free_pages >= kc->nr_reserved_pages)
230                         free_pl(pl);
231                 else {
232                         pl->next = kc->pages;
233                         kc->pages = pl;
234                         kc->nr_free_pages++;
235                 }
236
237                 pl = next;
238         } while (pl);
239 }
240
241 static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
242                             unsigned int nr, struct page_list **pages)
243 {
244         struct page_list *pl;
245
246         *pages = NULL;
247
248         do {
249                 pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM);
250                 if (unlikely(!pl)) {
251                         /* Use reserved pages */
252                         pl = kc->pages;
253                         if (unlikely(!pl))
254                                 goto out_of_memory;
255                         kc->pages = pl->next;
256                         kc->nr_free_pages--;
257                 }
258                 pl->next = *pages;
259                 *pages = pl;
260         } while (--nr);
261
262         return 0;
263
264 out_of_memory:
265         if (*pages)
266                 kcopyd_put_pages(kc, *pages);
267         return -ENOMEM;
268 }
269
270 /*
271  * These three functions resize the page pool.
272  */
273 static void drop_pages(struct page_list *pl)
274 {
275         struct page_list *next;
276
277         while (pl) {
278                 next = pl->next;
279                 free_pl(pl);
280                 pl = next;
281         }
282 }
283
284 /*
285  * Allocate and reserve nr_pages for the use of a specific client.
286  */
287 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages)
288 {
289         unsigned i;
290         struct page_list *pl = NULL, *next;
291
292         for (i = 0; i < nr_pages; i++) {
293                 next = alloc_pl(GFP_KERNEL);
294                 if (!next) {
295                         if (pl)
296                                 drop_pages(pl);
297                         return -ENOMEM;
298                 }
299                 next->next = pl;
300                 pl = next;
301         }
302
303         kc->nr_reserved_pages += nr_pages;
304         kcopyd_put_pages(kc, pl);
305
306         return 0;
307 }
308
309 static void client_free_pages(struct dm_kcopyd_client *kc)
310 {
311         BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages);
312         drop_pages(kc->pages);
313         kc->pages = NULL;
314         kc->nr_free_pages = kc->nr_reserved_pages = 0;
315 }
316
317 /*-----------------------------------------------------------------
318  * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
319  * for this reason we use a mempool to prevent the client from
320  * ever having to do io (which could cause a deadlock).
321  *---------------------------------------------------------------*/
322 struct kcopyd_job {
323         struct dm_kcopyd_client *kc;
324         struct list_head list;
325         unsigned long flags;
326
327         /*
328          * Error state of the job.
329          */
330         int read_err;
331         unsigned long write_err;
332
333         /*
334          * Either READ or WRITE
335          */
336         int rw;
337         struct dm_io_region source;
338
339         /*
340          * The destinations for the transfer.
341          */
342         unsigned int num_dests;
343         struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
344
345         struct page_list *pages;
346
347         /*
348          * Set this to ensure you are notified when the job has
349          * completed.  'context' is for callback to use.
350          */
351         dm_kcopyd_notify_fn fn;
352         void *context;
353
354         /*
355          * These fields are only used if the job has been split
356          * into more manageable parts.
357          */
358         struct mutex lock;
359         atomic_t sub_jobs;
360         sector_t progress;
361
362         struct kcopyd_job *master_job;
363 };
364
365 static struct kmem_cache *_job_cache;
366
367 int __init dm_kcopyd_init(void)
368 {
369         _job_cache = kmem_cache_create("kcopyd_job",
370                                 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1),
371                                 __alignof__(struct kcopyd_job), 0, NULL);
372         if (!_job_cache)
373                 return -ENOMEM;
374
375         zero_page_list.next = &zero_page_list;
376         zero_page_list.page = ZERO_PAGE(0);
377
378         return 0;
379 }
380
381 void dm_kcopyd_exit(void)
382 {
383         kmem_cache_destroy(_job_cache);
384         _job_cache = NULL;
385 }
386
387 /*
388  * Functions to push and pop a job onto the head of a given job
389  * list.
390  */
391 static struct kcopyd_job *pop(struct list_head *jobs,
392                               struct dm_kcopyd_client *kc)
393 {
394         struct kcopyd_job *job = NULL;
395         unsigned long flags;
396
397         spin_lock_irqsave(&kc->job_lock, flags);
398
399         if (!list_empty(jobs)) {
400                 job = list_entry(jobs->next, struct kcopyd_job, list);
401                 list_del(&job->list);
402         }
403         spin_unlock_irqrestore(&kc->job_lock, flags);
404
405         return job;
406 }
407
408 static void push(struct list_head *jobs, struct kcopyd_job *job)
409 {
410         unsigned long flags;
411         struct dm_kcopyd_client *kc = job->kc;
412
413         spin_lock_irqsave(&kc->job_lock, flags);
414         list_add_tail(&job->list, jobs);
415         spin_unlock_irqrestore(&kc->job_lock, flags);
416 }
417
418
419 static void push_head(struct list_head *jobs, struct kcopyd_job *job)
420 {
421         unsigned long flags;
422         struct dm_kcopyd_client *kc = job->kc;
423
424         spin_lock_irqsave(&kc->job_lock, flags);
425         list_add(&job->list, jobs);
426         spin_unlock_irqrestore(&kc->job_lock, flags);
427 }
428
429 /*
430  * These three functions process 1 item from the corresponding
431  * job list.
432  *
433  * They return:
434  * < 0: error
435  *   0: success
436  * > 0: can't process yet.
437  */
438 static int run_complete_job(struct kcopyd_job *job)
439 {
440         void *context = job->context;
441         int read_err = job->read_err;
442         unsigned long write_err = job->write_err;
443         dm_kcopyd_notify_fn fn = job->fn;
444         struct dm_kcopyd_client *kc = job->kc;
445
446         if (job->pages && job->pages != &zero_page_list)
447                 kcopyd_put_pages(kc, job->pages);
448         /*
449          * If this is the master job, the sub jobs have already
450          * completed so we can free everything.
451          */
452         if (job->master_job == job)
453                 mempool_free(job, kc->job_pool);
454         fn(read_err, write_err, context);
455
456         if (atomic_dec_and_test(&kc->nr_jobs))
457                 wake_up(&kc->destroyq);
458
459         cond_resched();
460
461         return 0;
462 }
463
464 static void complete_io(unsigned long error, void *context)
465 {
466         struct kcopyd_job *job = (struct kcopyd_job *) context;
467         struct dm_kcopyd_client *kc = job->kc;
468
469         io_job_finish(kc->throttle);
470
471         if (error) {
472                 if (op_is_write(job->rw))
473                         job->write_err |= error;
474                 else
475                         job->read_err = 1;
476
477                 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
478                         push(&kc->complete_jobs, job);
479                         wake(kc);
480                         return;
481                 }
482         }
483
484         if (op_is_write(job->rw))
485                 push(&kc->complete_jobs, job);
486
487         else {
488                 job->rw = WRITE;
489                 push(&kc->io_jobs, job);
490         }
491
492         wake(kc);
493 }
494
495 /*
496  * Request io on as many buffer heads as we can currently get for
497  * a particular job.
498  */
499 static int run_io_job(struct kcopyd_job *job)
500 {
501         int r;
502         struct dm_io_request io_req = {
503                 .bi_op = job->rw,
504                 .bi_op_flags = 0,
505                 .mem.type = DM_IO_PAGE_LIST,
506                 .mem.ptr.pl = job->pages,
507                 .mem.offset = 0,
508                 .notify.fn = complete_io,
509                 .notify.context = job,
510                 .client = job->kc->io_client,
511         };
512
513         io_job_start(job->kc->throttle);
514
515         if (job->rw == READ)
516                 r = dm_io(&io_req, 1, &job->source, NULL);
517         else
518                 r = dm_io(&io_req, job->num_dests, job->dests, NULL);
519
520         return r;
521 }
522
523 static int run_pages_job(struct kcopyd_job *job)
524 {
525         int r;
526         unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9);
527
528         r = kcopyd_get_pages(job->kc, nr_pages, &job->pages);
529         if (!r) {
530                 /* this job is ready for io */
531                 push(&job->kc->io_jobs, job);
532                 return 0;
533         }
534
535         if (r == -ENOMEM)
536                 /* can't complete now */
537                 return 1;
538
539         return r;
540 }
541
542 /*
543  * Run through a list for as long as possible.  Returns the count
544  * of successful jobs.
545  */
546 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
547                         int (*fn) (struct kcopyd_job *))
548 {
549         struct kcopyd_job *job;
550         int r, count = 0;
551
552         while ((job = pop(jobs, kc))) {
553
554                 r = fn(job);
555
556                 if (r < 0) {
557                         /* error this rogue job */
558                         if (op_is_write(job->rw))
559                                 job->write_err = (unsigned long) -1L;
560                         else
561                                 job->read_err = 1;
562                         push(&kc->complete_jobs, job);
563                         break;
564                 }
565
566                 if (r > 0) {
567                         /*
568                          * We couldn't service this job ATM, so
569                          * push this job back onto the list.
570                          */
571                         push_head(jobs, job);
572                         break;
573                 }
574
575                 count++;
576         }
577
578         return count;
579 }
580
581 /*
582  * kcopyd does this every time it's woken up.
583  */
584 static void do_work(struct work_struct *work)
585 {
586         struct dm_kcopyd_client *kc = container_of(work,
587                                         struct dm_kcopyd_client, kcopyd_work);
588         struct blk_plug plug;
589         unsigned long flags;
590
591         /*
592          * The order that these are called is *very* important.
593          * complete jobs can free some pages for pages jobs.
594          * Pages jobs when successful will jump onto the io jobs
595          * list.  io jobs call wake when they complete and it all
596          * starts again.
597          */
598         spin_lock_irqsave(&kc->job_lock, flags);
599         list_splice_tail_init(&kc->callback_jobs, &kc->complete_jobs);
600         spin_unlock_irqrestore(&kc->job_lock, flags);
601
602         blk_start_plug(&plug);
603         process_jobs(&kc->complete_jobs, kc, run_complete_job);
604         process_jobs(&kc->pages_jobs, kc, run_pages_job);
605         process_jobs(&kc->io_jobs, kc, run_io_job);
606         blk_finish_plug(&plug);
607 }
608
609 /*
610  * If we are copying a small region we just dispatch a single job
611  * to do the copy, otherwise the io has to be split up into many
612  * jobs.
613  */
614 static void dispatch_job(struct kcopyd_job *job)
615 {
616         struct dm_kcopyd_client *kc = job->kc;
617         atomic_inc(&kc->nr_jobs);
618         if (unlikely(!job->source.count))
619                 push(&kc->callback_jobs, job);
620         else if (job->pages == &zero_page_list)
621                 push(&kc->io_jobs, job);
622         else
623                 push(&kc->pages_jobs, job);
624         wake(kc);
625 }
626
627 static void segment_complete(int read_err, unsigned long write_err,
628                              void *context)
629 {
630         /* FIXME: tidy this function */
631         sector_t progress = 0;
632         sector_t count = 0;
633         struct kcopyd_job *sub_job = (struct kcopyd_job *) context;
634         struct kcopyd_job *job = sub_job->master_job;
635         struct dm_kcopyd_client *kc = job->kc;
636
637         mutex_lock(&job->lock);
638
639         /* update the error */
640         if (read_err)
641                 job->read_err = 1;
642
643         if (write_err)
644                 job->write_err |= write_err;
645
646         /*
647          * Only dispatch more work if there hasn't been an error.
648          */
649         if ((!job->read_err && !job->write_err) ||
650             test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
651                 /* get the next chunk of work */
652                 progress = job->progress;
653                 count = job->source.count - progress;
654                 if (count) {
655                         if (count > SUB_JOB_SIZE)
656                                 count = SUB_JOB_SIZE;
657
658                         job->progress += count;
659                 }
660         }
661         mutex_unlock(&job->lock);
662
663         if (count) {
664                 int i;
665
666                 *sub_job = *job;
667                 sub_job->source.sector += progress;
668                 sub_job->source.count = count;
669
670                 for (i = 0; i < job->num_dests; i++) {
671                         sub_job->dests[i].sector += progress;
672                         sub_job->dests[i].count = count;
673                 }
674
675                 sub_job->fn = segment_complete;
676                 sub_job->context = sub_job;
677                 dispatch_job(sub_job);
678
679         } else if (atomic_dec_and_test(&job->sub_jobs)) {
680
681                 /*
682                  * Queue the completion callback to the kcopyd thread.
683                  *
684                  * Some callers assume that all the completions are called
685                  * from a single thread and don't race with each other.
686                  *
687                  * We must not call the callback directly here because this
688                  * code may not be executing in the thread.
689                  */
690                 push(&kc->complete_jobs, job);
691                 wake(kc);
692         }
693 }
694
695 /*
696  * Create some sub jobs to share the work between them.
697  */
698 static void split_job(struct kcopyd_job *master_job)
699 {
700         int i;
701
702         atomic_inc(&master_job->kc->nr_jobs);
703
704         atomic_set(&master_job->sub_jobs, SPLIT_COUNT);
705         for (i = 0; i < SPLIT_COUNT; i++) {
706                 master_job[i + 1].master_job = master_job;
707                 segment_complete(0, 0u, &master_job[i + 1]);
708         }
709 }
710
711 int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
712                    unsigned int num_dests, struct dm_io_region *dests,
713                    unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
714 {
715         struct kcopyd_job *job;
716         int i;
717
718         /*
719          * Allocate an array of jobs consisting of one master job
720          * followed by SPLIT_COUNT sub jobs.
721          */
722         job = mempool_alloc(kc->job_pool, GFP_NOIO);
723
724         /*
725          * set up for the read.
726          */
727         job->kc = kc;
728         job->flags = flags;
729         job->read_err = 0;
730         job->write_err = 0;
731
732         job->num_dests = num_dests;
733         memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
734
735         if (from) {
736                 job->source = *from;
737                 job->pages = NULL;
738                 job->rw = READ;
739         } else {
740                 memset(&job->source, 0, sizeof job->source);
741                 job->source.count = job->dests[0].count;
742                 job->pages = &zero_page_list;
743
744                 /*
745                  * Use WRITE SAME to optimize zeroing if all dests support it.
746                  */
747                 job->rw = REQ_OP_WRITE_SAME;
748                 for (i = 0; i < job->num_dests; i++)
749                         if (!bdev_write_same(job->dests[i].bdev)) {
750                                 job->rw = WRITE;
751                                 break;
752                         }
753         }
754
755         job->fn = fn;
756         job->context = context;
757         job->master_job = job;
758
759         if (job->source.count <= SUB_JOB_SIZE)
760                 dispatch_job(job);
761         else {
762                 mutex_init(&job->lock);
763                 job->progress = 0;
764                 split_job(job);
765         }
766
767         return 0;
768 }
769 EXPORT_SYMBOL(dm_kcopyd_copy);
770
771 int dm_kcopyd_zero(struct dm_kcopyd_client *kc,
772                    unsigned num_dests, struct dm_io_region *dests,
773                    unsigned flags, dm_kcopyd_notify_fn fn, void *context)
774 {
775         return dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context);
776 }
777 EXPORT_SYMBOL(dm_kcopyd_zero);
778
779 void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc,
780                                  dm_kcopyd_notify_fn fn, void *context)
781 {
782         struct kcopyd_job *job;
783
784         job = mempool_alloc(kc->job_pool, GFP_NOIO);
785
786         memset(job, 0, sizeof(struct kcopyd_job));
787         job->kc = kc;
788         job->fn = fn;
789         job->context = context;
790         job->master_job = job;
791
792         atomic_inc(&kc->nr_jobs);
793
794         return job;
795 }
796 EXPORT_SYMBOL(dm_kcopyd_prepare_callback);
797
798 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err)
799 {
800         struct kcopyd_job *job = j;
801         struct dm_kcopyd_client *kc = job->kc;
802
803         job->read_err = read_err;
804         job->write_err = write_err;
805
806         push(&kc->callback_jobs, job);
807         wake(kc);
808 }
809 EXPORT_SYMBOL(dm_kcopyd_do_callback);
810
811 /*
812  * Cancels a kcopyd job, eg. someone might be deactivating a
813  * mirror.
814  */
815 #if 0
816 int kcopyd_cancel(struct kcopyd_job *job, int block)
817 {
818         /* FIXME: finish */
819         return -1;
820 }
821 #endif  /*  0  */
822
823 /*-----------------------------------------------------------------
824  * Client setup
825  *---------------------------------------------------------------*/
826 struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle)
827 {
828         int r = -ENOMEM;
829         struct dm_kcopyd_client *kc;
830
831         kc = kzalloc(sizeof(*kc), GFP_KERNEL);
832         if (!kc)
833                 return ERR_PTR(-ENOMEM);
834
835         spin_lock_init(&kc->job_lock);
836         INIT_LIST_HEAD(&kc->callback_jobs);
837         INIT_LIST_HEAD(&kc->complete_jobs);
838         INIT_LIST_HEAD(&kc->io_jobs);
839         INIT_LIST_HEAD(&kc->pages_jobs);
840         kc->throttle = throttle;
841
842         kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache);
843         if (!kc->job_pool)
844                 goto bad_slab;
845
846         INIT_WORK(&kc->kcopyd_work, do_work);
847         kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0);
848         if (!kc->kcopyd_wq)
849                 goto bad_workqueue;
850
851         kc->pages = NULL;
852         kc->nr_reserved_pages = kc->nr_free_pages = 0;
853         r = client_reserve_pages(kc, RESERVE_PAGES);
854         if (r)
855                 goto bad_client_pages;
856
857         kc->io_client = dm_io_client_create();
858         if (IS_ERR(kc->io_client)) {
859                 r = PTR_ERR(kc->io_client);
860                 goto bad_io_client;
861         }
862
863         init_waitqueue_head(&kc->destroyq);
864         atomic_set(&kc->nr_jobs, 0);
865
866         return kc;
867
868 bad_io_client:
869         client_free_pages(kc);
870 bad_client_pages:
871         destroy_workqueue(kc->kcopyd_wq);
872 bad_workqueue:
873         mempool_destroy(kc->job_pool);
874 bad_slab:
875         kfree(kc);
876
877         return ERR_PTR(r);
878 }
879 EXPORT_SYMBOL(dm_kcopyd_client_create);
880
881 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
882 {
883         /* Wait for completion of all jobs submitted by this client. */
884         wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
885
886         BUG_ON(!list_empty(&kc->callback_jobs));
887         BUG_ON(!list_empty(&kc->complete_jobs));
888         BUG_ON(!list_empty(&kc->io_jobs));
889         BUG_ON(!list_empty(&kc->pages_jobs));
890         destroy_workqueue(kc->kcopyd_wq);
891         dm_io_client_destroy(kc->io_client);
892         client_free_pages(kc);
893         mempool_destroy(kc->job_pool);
894         kfree(kc);
895 }
896 EXPORT_SYMBOL(dm_kcopyd_client_destroy);