GNU Linux-libre 5.10.153-gnu1
[releases.git] / drivers / block / drbd / drbd_worker.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3    drbd_worker.c
4
5    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6
7    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
8    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
9    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10
11
12 */
13
14 #include <linux/module.h>
15 #include <linux/drbd.h>
16 #include <linux/sched/signal.h>
17 #include <linux/wait.h>
18 #include <linux/mm.h>
19 #include <linux/memcontrol.h>
20 #include <linux/mm_inline.h>
21 #include <linux/slab.h>
22 #include <linux/random.h>
23 #include <linux/string.h>
24 #include <linux/scatterlist.h>
25 #include <linux/part_stat.h>
26
27 #include "drbd_int.h"
28 #include "drbd_protocol.h"
29 #include "drbd_req.h"
30
31 static int make_ov_request(struct drbd_device *, int);
32 static int make_resync_request(struct drbd_device *, int);
33
34 /* endio handlers:
35  *   drbd_md_endio (defined here)
36  *   drbd_request_endio (defined here)
37  *   drbd_peer_request_endio (defined here)
38  *   drbd_bm_endio (defined in drbd_bitmap.c)
39  *
40  * For all these callbacks, note the following:
41  * The callbacks will be called in irq context by the IDE drivers,
42  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
43  * Try to get the locking right :)
44  *
45  */
46
47 /* used for synchronous meta data and bitmap IO
48  * submitted by drbd_md_sync_page_io()
49  */
50 void drbd_md_endio(struct bio *bio)
51 {
52         struct drbd_device *device;
53
54         device = bio->bi_private;
55         device->md_io.error = blk_status_to_errno(bio->bi_status);
56
57         /* special case: drbd_md_read() during drbd_adm_attach() */
58         if (device->ldev)
59                 put_ldev(device);
60         bio_put(bio);
61
62         /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
63          * to timeout on the lower level device, and eventually detach from it.
64          * If this io completion runs after that timeout expired, this
65          * drbd_md_put_buffer() may allow us to finally try and re-attach.
66          * During normal operation, this only puts that extra reference
67          * down to 1 again.
68          * Make sure we first drop the reference, and only then signal
69          * completion, or we may (in drbd_al_read_log()) cycle so fast into the
70          * next drbd_md_sync_page_io(), that we trigger the
71          * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
72          */
73         drbd_md_put_buffer(device);
74         device->md_io.done = 1;
75         wake_up(&device->misc_wait);
76 }
77
78 /* reads on behalf of the partner,
79  * "submitted" by the receiver
80  */
81 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
82 {
83         unsigned long flags = 0;
84         struct drbd_peer_device *peer_device = peer_req->peer_device;
85         struct drbd_device *device = peer_device->device;
86
87         spin_lock_irqsave(&device->resource->req_lock, flags);
88         device->read_cnt += peer_req->i.size >> 9;
89         list_del(&peer_req->w.list);
90         if (list_empty(&device->read_ee))
91                 wake_up(&device->ee_wait);
92         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
93                 __drbd_chk_io_error(device, DRBD_READ_ERROR);
94         spin_unlock_irqrestore(&device->resource->req_lock, flags);
95
96         drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
97         put_ldev(device);
98 }
99
100 /* writes on behalf of the partner, or resync writes,
101  * "submitted" by the receiver, final stage.  */
102 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
103 {
104         unsigned long flags = 0;
105         struct drbd_peer_device *peer_device = peer_req->peer_device;
106         struct drbd_device *device = peer_device->device;
107         struct drbd_connection *connection = peer_device->connection;
108         struct drbd_interval i;
109         int do_wake;
110         u64 block_id;
111         int do_al_complete_io;
112
113         /* after we moved peer_req to done_ee,
114          * we may no longer access it,
115          * it may be freed/reused already!
116          * (as soon as we release the req_lock) */
117         i = peer_req->i;
118         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
119         block_id = peer_req->block_id;
120         peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
121
122         if (peer_req->flags & EE_WAS_ERROR) {
123                 /* In protocol != C, we usually do not send write acks.
124                  * In case of a write error, send the neg ack anyways. */
125                 if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
126                         inc_unacked(device);
127                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
128         }
129
130         spin_lock_irqsave(&device->resource->req_lock, flags);
131         device->writ_cnt += peer_req->i.size >> 9;
132         list_move_tail(&peer_req->w.list, &device->done_ee);
133
134         /*
135          * Do not remove from the write_requests tree here: we did not send the
136          * Ack yet and did not wake possibly waiting conflicting requests.
137          * Removed from the tree from "drbd_process_done_ee" within the
138          * appropriate dw.cb (e_end_block/e_end_resync_block) or from
139          * _drbd_clear_done_ee.
140          */
141
142         do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
143
144         /* FIXME do we want to detach for failed REQ_OP_DISCARD?
145          * ((peer_req->flags & (EE_WAS_ERROR|EE_TRIM)) == EE_WAS_ERROR) */
146         if (peer_req->flags & EE_WAS_ERROR)
147                 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
148
149         if (connection->cstate >= C_WF_REPORT_PARAMS) {
150                 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
151                 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
152                         kref_put(&device->kref, drbd_destroy_device);
153         }
154         spin_unlock_irqrestore(&device->resource->req_lock, flags);
155
156         if (block_id == ID_SYNCER)
157                 drbd_rs_complete_io(device, i.sector);
158
159         if (do_wake)
160                 wake_up(&device->ee_wait);
161
162         if (do_al_complete_io)
163                 drbd_al_complete_io(device, &i);
164
165         put_ldev(device);
166 }
167
168 /* writes on behalf of the partner, or resync writes,
169  * "submitted" by the receiver.
170  */
171 void drbd_peer_request_endio(struct bio *bio)
172 {
173         struct drbd_peer_request *peer_req = bio->bi_private;
174         struct drbd_device *device = peer_req->peer_device->device;
175         bool is_write = bio_data_dir(bio) == WRITE;
176         bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
177                           bio_op(bio) == REQ_OP_DISCARD;
178
179         if (bio->bi_status && __ratelimit(&drbd_ratelimit_state))
180                 drbd_warn(device, "%s: error=%d s=%llus\n",
181                                 is_write ? (is_discard ? "discard" : "write")
182                                         : "read", bio->bi_status,
183                                 (unsigned long long)peer_req->i.sector);
184
185         if (bio->bi_status)
186                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
187
188         bio_put(bio); /* no need for the bio anymore */
189         if (atomic_dec_and_test(&peer_req->pending_bios)) {
190                 if (is_write)
191                         drbd_endio_write_sec_final(peer_req);
192                 else
193                         drbd_endio_read_sec_final(peer_req);
194         }
195 }
196
197 static void
198 drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
199 {
200         panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
201                 device->minor, device->resource->name, device->vnr);
202 }
203
204 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
205  */
206 void drbd_request_endio(struct bio *bio)
207 {
208         unsigned long flags;
209         struct drbd_request *req = bio->bi_private;
210         struct drbd_device *device = req->device;
211         struct bio_and_error m;
212         enum drbd_req_event what;
213
214         /* If this request was aborted locally before,
215          * but now was completed "successfully",
216          * chances are that this caused arbitrary data corruption.
217          *
218          * "aborting" requests, or force-detaching the disk, is intended for
219          * completely blocked/hung local backing devices which do no longer
220          * complete requests at all, not even do error completions.  In this
221          * situation, usually a hard-reset and failover is the only way out.
222          *
223          * By "aborting", basically faking a local error-completion,
224          * we allow for a more graceful swichover by cleanly migrating services.
225          * Still the affected node has to be rebooted "soon".
226          *
227          * By completing these requests, we allow the upper layers to re-use
228          * the associated data pages.
229          *
230          * If later the local backing device "recovers", and now DMAs some data
231          * from disk into the original request pages, in the best case it will
232          * just put random data into unused pages; but typically it will corrupt
233          * meanwhile completely unrelated data, causing all sorts of damage.
234          *
235          * Which means delayed successful completion,
236          * especially for READ requests,
237          * is a reason to panic().
238          *
239          * We assume that a delayed *error* completion is OK,
240          * though we still will complain noisily about it.
241          */
242         if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
243                 if (__ratelimit(&drbd_ratelimit_state))
244                         drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
245
246                 if (!bio->bi_status)
247                         drbd_panic_after_delayed_completion_of_aborted_request(device);
248         }
249
250         /* to avoid recursion in __req_mod */
251         if (unlikely(bio->bi_status)) {
252                 switch (bio_op(bio)) {
253                 case REQ_OP_WRITE_ZEROES:
254                 case REQ_OP_DISCARD:
255                         if (bio->bi_status == BLK_STS_NOTSUPP)
256                                 what = DISCARD_COMPLETED_NOTSUPP;
257                         else
258                                 what = DISCARD_COMPLETED_WITH_ERROR;
259                         break;
260                 case REQ_OP_READ:
261                         if (bio->bi_opf & REQ_RAHEAD)
262                                 what = READ_AHEAD_COMPLETED_WITH_ERROR;
263                         else
264                                 what = READ_COMPLETED_WITH_ERROR;
265                         break;
266                 default:
267                         what = WRITE_COMPLETED_WITH_ERROR;
268                         break;
269                 }
270         } else {
271                 what = COMPLETED_OK;
272         }
273
274         req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
275         bio_put(bio);
276
277         /* not req_mod(), we need irqsave here! */
278         spin_lock_irqsave(&device->resource->req_lock, flags);
279         __req_mod(req, what, &m);
280         spin_unlock_irqrestore(&device->resource->req_lock, flags);
281         put_ldev(device);
282
283         if (m.bio)
284                 complete_master_bio(device, &m);
285 }
286
287 void drbd_csum_ee(struct crypto_shash *tfm, struct drbd_peer_request *peer_req, void *digest)
288 {
289         SHASH_DESC_ON_STACK(desc, tfm);
290         struct page *page = peer_req->pages;
291         struct page *tmp;
292         unsigned len;
293         void *src;
294
295         desc->tfm = tfm;
296
297         crypto_shash_init(desc);
298
299         src = kmap_atomic(page);
300         while ((tmp = page_chain_next(page))) {
301                 /* all but the last page will be fully used */
302                 crypto_shash_update(desc, src, PAGE_SIZE);
303                 kunmap_atomic(src);
304                 page = tmp;
305                 src = kmap_atomic(page);
306         }
307         /* and now the last, possibly only partially used page */
308         len = peer_req->i.size & (PAGE_SIZE - 1);
309         crypto_shash_update(desc, src, len ?: PAGE_SIZE);
310         kunmap_atomic(src);
311
312         crypto_shash_final(desc, digest);
313         shash_desc_zero(desc);
314 }
315
316 void drbd_csum_bio(struct crypto_shash *tfm, struct bio *bio, void *digest)
317 {
318         SHASH_DESC_ON_STACK(desc, tfm);
319         struct bio_vec bvec;
320         struct bvec_iter iter;
321
322         desc->tfm = tfm;
323
324         crypto_shash_init(desc);
325
326         bio_for_each_segment(bvec, bio, iter) {
327                 u8 *src;
328
329                 src = kmap_atomic(bvec.bv_page);
330                 crypto_shash_update(desc, src + bvec.bv_offset, bvec.bv_len);
331                 kunmap_atomic(src);
332
333                 /* REQ_OP_WRITE_SAME has only one segment,
334                  * checksum the payload only once. */
335                 if (bio_op(bio) == REQ_OP_WRITE_SAME)
336                         break;
337         }
338         crypto_shash_final(desc, digest);
339         shash_desc_zero(desc);
340 }
341
342 /* MAYBE merge common code with w_e_end_ov_req */
343 static int w_e_send_csum(struct drbd_work *w, int cancel)
344 {
345         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
346         struct drbd_peer_device *peer_device = peer_req->peer_device;
347         struct drbd_device *device = peer_device->device;
348         int digest_size;
349         void *digest;
350         int err = 0;
351
352         if (unlikely(cancel))
353                 goto out;
354
355         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
356                 goto out;
357
358         digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
359         digest = kmalloc(digest_size, GFP_NOIO);
360         if (digest) {
361                 sector_t sector = peer_req->i.sector;
362                 unsigned int size = peer_req->i.size;
363                 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
364                 /* Free peer_req and pages before send.
365                  * In case we block on congestion, we could otherwise run into
366                  * some distributed deadlock, if the other side blocks on
367                  * congestion as well, because our receiver blocks in
368                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
369                 drbd_free_peer_req(device, peer_req);
370                 peer_req = NULL;
371                 inc_rs_pending(device);
372                 err = drbd_send_drequest_csum(peer_device, sector, size,
373                                               digest, digest_size,
374                                               P_CSUM_RS_REQUEST);
375                 kfree(digest);
376         } else {
377                 drbd_err(device, "kmalloc() of digest failed.\n");
378                 err = -ENOMEM;
379         }
380
381 out:
382         if (peer_req)
383                 drbd_free_peer_req(device, peer_req);
384
385         if (unlikely(err))
386                 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
387         return err;
388 }
389
390 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
391
392 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
393 {
394         struct drbd_device *device = peer_device->device;
395         struct drbd_peer_request *peer_req;
396
397         if (!get_ldev(device))
398                 return -EIO;
399
400         /* GFP_TRY, because if there is no memory available right now, this may
401          * be rescheduled for later. It is "only" background resync, after all. */
402         peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
403                                        size, size, GFP_TRY);
404         if (!peer_req)
405                 goto defer;
406
407         peer_req->w.cb = w_e_send_csum;
408         spin_lock_irq(&device->resource->req_lock);
409         list_add_tail(&peer_req->w.list, &device->read_ee);
410         spin_unlock_irq(&device->resource->req_lock);
411
412         atomic_add(size >> 9, &device->rs_sect_ev);
413         if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
414                                      DRBD_FAULT_RS_RD) == 0)
415                 return 0;
416
417         /* If it failed because of ENOMEM, retry should help.  If it failed
418          * because bio_add_page failed (probably broken lower level driver),
419          * retry may or may not help.
420          * If it does not, you may need to force disconnect. */
421         spin_lock_irq(&device->resource->req_lock);
422         list_del(&peer_req->w.list);
423         spin_unlock_irq(&device->resource->req_lock);
424
425         drbd_free_peer_req(device, peer_req);
426 defer:
427         put_ldev(device);
428         return -EAGAIN;
429 }
430
431 int w_resync_timer(struct drbd_work *w, int cancel)
432 {
433         struct drbd_device *device =
434                 container_of(w, struct drbd_device, resync_work);
435
436         switch (device->state.conn) {
437         case C_VERIFY_S:
438                 make_ov_request(device, cancel);
439                 break;
440         case C_SYNC_TARGET:
441                 make_resync_request(device, cancel);
442                 break;
443         }
444
445         return 0;
446 }
447
448 void resync_timer_fn(struct timer_list *t)
449 {
450         struct drbd_device *device = from_timer(device, t, resync_timer);
451
452         drbd_queue_work_if_unqueued(
453                 &first_peer_device(device)->connection->sender_work,
454                 &device->resync_work);
455 }
456
457 static void fifo_set(struct fifo_buffer *fb, int value)
458 {
459         int i;
460
461         for (i = 0; i < fb->size; i++)
462                 fb->values[i] = value;
463 }
464
465 static int fifo_push(struct fifo_buffer *fb, int value)
466 {
467         int ov;
468
469         ov = fb->values[fb->head_index];
470         fb->values[fb->head_index++] = value;
471
472         if (fb->head_index >= fb->size)
473                 fb->head_index = 0;
474
475         return ov;
476 }
477
478 static void fifo_add_val(struct fifo_buffer *fb, int value)
479 {
480         int i;
481
482         for (i = 0; i < fb->size; i++)
483                 fb->values[i] += value;
484 }
485
486 struct fifo_buffer *fifo_alloc(unsigned int fifo_size)
487 {
488         struct fifo_buffer *fb;
489
490         fb = kzalloc(struct_size(fb, values, fifo_size), GFP_NOIO);
491         if (!fb)
492                 return NULL;
493
494         fb->head_index = 0;
495         fb->size = fifo_size;
496         fb->total = 0;
497
498         return fb;
499 }
500
501 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
502 {
503         struct disk_conf *dc;
504         unsigned int want;     /* The number of sectors we want in-flight */
505         int req_sect; /* Number of sectors to request in this turn */
506         int correction; /* Number of sectors more we need in-flight */
507         int cps; /* correction per invocation of drbd_rs_controller() */
508         int steps; /* Number of time steps to plan ahead */
509         int curr_corr;
510         int max_sect;
511         struct fifo_buffer *plan;
512
513         dc = rcu_dereference(device->ldev->disk_conf);
514         plan = rcu_dereference(device->rs_plan_s);
515
516         steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
517
518         if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
519                 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
520         } else { /* normal path */
521                 want = dc->c_fill_target ? dc->c_fill_target :
522                         sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
523         }
524
525         correction = want - device->rs_in_flight - plan->total;
526
527         /* Plan ahead */
528         cps = correction / steps;
529         fifo_add_val(plan, cps);
530         plan->total += cps * steps;
531
532         /* What we do in this step */
533         curr_corr = fifo_push(plan, 0);
534         plan->total -= curr_corr;
535
536         req_sect = sect_in + curr_corr;
537         if (req_sect < 0)
538                 req_sect = 0;
539
540         max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
541         if (req_sect > max_sect)
542                 req_sect = max_sect;
543
544         /*
545         drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
546                  sect_in, device->rs_in_flight, want, correction,
547                  steps, cps, device->rs_planed, curr_corr, req_sect);
548         */
549
550         return req_sect;
551 }
552
553 static int drbd_rs_number_requests(struct drbd_device *device)
554 {
555         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
556         int number, mxb;
557
558         sect_in = atomic_xchg(&device->rs_sect_in, 0);
559         device->rs_in_flight -= sect_in;
560
561         rcu_read_lock();
562         mxb = drbd_get_max_buffers(device) / 2;
563         if (rcu_dereference(device->rs_plan_s)->size) {
564                 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
565                 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
566         } else {
567                 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
568                 number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
569         }
570         rcu_read_unlock();
571
572         /* Don't have more than "max-buffers"/2 in-flight.
573          * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
574          * potentially causing a distributed deadlock on congestion during
575          * online-verify or (checksum-based) resync, if max-buffers,
576          * socket buffer sizes and resync rate settings are mis-configured. */
577
578         /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
579          * mxb (as used here, and in drbd_alloc_pages on the peer) is
580          * "number of pages" (typically also 4k),
581          * but "rs_in_flight" is in "sectors" (512 Byte). */
582         if (mxb - device->rs_in_flight/8 < number)
583                 number = mxb - device->rs_in_flight/8;
584
585         return number;
586 }
587
588 static int make_resync_request(struct drbd_device *const device, int cancel)
589 {
590         struct drbd_peer_device *const peer_device = first_peer_device(device);
591         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
592         unsigned long bit;
593         sector_t sector;
594         const sector_t capacity = get_capacity(device->vdisk);
595         int max_bio_size;
596         int number, rollback_i, size;
597         int align, requeue = 0;
598         int i = 0;
599         int discard_granularity = 0;
600
601         if (unlikely(cancel))
602                 return 0;
603
604         if (device->rs_total == 0) {
605                 /* empty resync? */
606                 drbd_resync_finished(device);
607                 return 0;
608         }
609
610         if (!get_ldev(device)) {
611                 /* Since we only need to access device->rsync a
612                    get_ldev_if_state(device,D_FAILED) would be sufficient, but
613                    to continue resync with a broken disk makes no sense at
614                    all */
615                 drbd_err(device, "Disk broke down during resync!\n");
616                 return 0;
617         }
618
619         if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
620                 rcu_read_lock();
621                 discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
622                 rcu_read_unlock();
623         }
624
625         max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
626         number = drbd_rs_number_requests(device);
627         if (number <= 0)
628                 goto requeue;
629
630         for (i = 0; i < number; i++) {
631                 /* Stop generating RS requests when half of the send buffer is filled,
632                  * but notify TCP that we'd like to have more space. */
633                 mutex_lock(&connection->data.mutex);
634                 if (connection->data.socket) {
635                         struct sock *sk = connection->data.socket->sk;
636                         int queued = sk->sk_wmem_queued;
637                         int sndbuf = sk->sk_sndbuf;
638                         if (queued > sndbuf / 2) {
639                                 requeue = 1;
640                                 if (sk->sk_socket)
641                                         set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
642                         }
643                 } else
644                         requeue = 1;
645                 mutex_unlock(&connection->data.mutex);
646                 if (requeue)
647                         goto requeue;
648
649 next_sector:
650                 size = BM_BLOCK_SIZE;
651                 bit  = drbd_bm_find_next(device, device->bm_resync_fo);
652
653                 if (bit == DRBD_END_OF_BITMAP) {
654                         device->bm_resync_fo = drbd_bm_bits(device);
655                         put_ldev(device);
656                         return 0;
657                 }
658
659                 sector = BM_BIT_TO_SECT(bit);
660
661                 if (drbd_try_rs_begin_io(device, sector)) {
662                         device->bm_resync_fo = bit;
663                         goto requeue;
664                 }
665                 device->bm_resync_fo = bit + 1;
666
667                 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
668                         drbd_rs_complete_io(device, sector);
669                         goto next_sector;
670                 }
671
672 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
673                 /* try to find some adjacent bits.
674                  * we stop if we have already the maximum req size.
675                  *
676                  * Additionally always align bigger requests, in order to
677                  * be prepared for all stripe sizes of software RAIDs.
678                  */
679                 align = 1;
680                 rollback_i = i;
681                 while (i < number) {
682                         if (size + BM_BLOCK_SIZE > max_bio_size)
683                                 break;
684
685                         /* Be always aligned */
686                         if (sector & ((1<<(align+3))-1))
687                                 break;
688
689                         if (discard_granularity && size == discard_granularity)
690                                 break;
691
692                         /* do not cross extent boundaries */
693                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
694                                 break;
695                         /* now, is it actually dirty, after all?
696                          * caution, drbd_bm_test_bit is tri-state for some
697                          * obscure reason; ( b == 0 ) would get the out-of-band
698                          * only accidentally right because of the "oddly sized"
699                          * adjustment below */
700                         if (drbd_bm_test_bit(device, bit+1) != 1)
701                                 break;
702                         bit++;
703                         size += BM_BLOCK_SIZE;
704                         if ((BM_BLOCK_SIZE << align) <= size)
705                                 align++;
706                         i++;
707                 }
708                 /* if we merged some,
709                  * reset the offset to start the next drbd_bm_find_next from */
710                 if (size > BM_BLOCK_SIZE)
711                         device->bm_resync_fo = bit + 1;
712 #endif
713
714                 /* adjust very last sectors, in case we are oddly sized */
715                 if (sector + (size>>9) > capacity)
716                         size = (capacity-sector)<<9;
717
718                 if (device->use_csums) {
719                         switch (read_for_csum(peer_device, sector, size)) {
720                         case -EIO: /* Disk failure */
721                                 put_ldev(device);
722                                 return -EIO;
723                         case -EAGAIN: /* allocation failed, or ldev busy */
724                                 drbd_rs_complete_io(device, sector);
725                                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
726                                 i = rollback_i;
727                                 goto requeue;
728                         case 0:
729                                 /* everything ok */
730                                 break;
731                         default:
732                                 BUG();
733                         }
734                 } else {
735                         int err;
736
737                         inc_rs_pending(device);
738                         err = drbd_send_drequest(peer_device,
739                                                  size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
740                                                  sector, size, ID_SYNCER);
741                         if (err) {
742                                 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
743                                 dec_rs_pending(device);
744                                 put_ldev(device);
745                                 return err;
746                         }
747                 }
748         }
749
750         if (device->bm_resync_fo >= drbd_bm_bits(device)) {
751                 /* last syncer _request_ was sent,
752                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
753                  * next sync group will resume), as soon as we receive the last
754                  * resync data block, and the last bit is cleared.
755                  * until then resync "work" is "inactive" ...
756                  */
757                 put_ldev(device);
758                 return 0;
759         }
760
761  requeue:
762         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
763         mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
764         put_ldev(device);
765         return 0;
766 }
767
768 static int make_ov_request(struct drbd_device *device, int cancel)
769 {
770         int number, i, size;
771         sector_t sector;
772         const sector_t capacity = get_capacity(device->vdisk);
773         bool stop_sector_reached = false;
774
775         if (unlikely(cancel))
776                 return 1;
777
778         number = drbd_rs_number_requests(device);
779
780         sector = device->ov_position;
781         for (i = 0; i < number; i++) {
782                 if (sector >= capacity)
783                         return 1;
784
785                 /* We check for "finished" only in the reply path:
786                  * w_e_end_ov_reply().
787                  * We need to send at least one request out. */
788                 stop_sector_reached = i > 0
789                         && verify_can_do_stop_sector(device)
790                         && sector >= device->ov_stop_sector;
791                 if (stop_sector_reached)
792                         break;
793
794                 size = BM_BLOCK_SIZE;
795
796                 if (drbd_try_rs_begin_io(device, sector)) {
797                         device->ov_position = sector;
798                         goto requeue;
799                 }
800
801                 if (sector + (size>>9) > capacity)
802                         size = (capacity-sector)<<9;
803
804                 inc_rs_pending(device);
805                 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
806                         dec_rs_pending(device);
807                         return 0;
808                 }
809                 sector += BM_SECT_PER_BIT;
810         }
811         device->ov_position = sector;
812
813  requeue:
814         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
815         if (i == 0 || !stop_sector_reached)
816                 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
817         return 1;
818 }
819
820 int w_ov_finished(struct drbd_work *w, int cancel)
821 {
822         struct drbd_device_work *dw =
823                 container_of(w, struct drbd_device_work, w);
824         struct drbd_device *device = dw->device;
825         kfree(dw);
826         ov_out_of_sync_print(device);
827         drbd_resync_finished(device);
828
829         return 0;
830 }
831
832 static int w_resync_finished(struct drbd_work *w, int cancel)
833 {
834         struct drbd_device_work *dw =
835                 container_of(w, struct drbd_device_work, w);
836         struct drbd_device *device = dw->device;
837         kfree(dw);
838
839         drbd_resync_finished(device);
840
841         return 0;
842 }
843
844 static void ping_peer(struct drbd_device *device)
845 {
846         struct drbd_connection *connection = first_peer_device(device)->connection;
847
848         clear_bit(GOT_PING_ACK, &connection->flags);
849         request_ping(connection);
850         wait_event(connection->ping_wait,
851                    test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
852 }
853
854 int drbd_resync_finished(struct drbd_device *device)
855 {
856         struct drbd_connection *connection = first_peer_device(device)->connection;
857         unsigned long db, dt, dbdt;
858         unsigned long n_oos;
859         union drbd_state os, ns;
860         struct drbd_device_work *dw;
861         char *khelper_cmd = NULL;
862         int verify_done = 0;
863
864         /* Remove all elements from the resync LRU. Since future actions
865          * might set bits in the (main) bitmap, then the entries in the
866          * resync LRU would be wrong. */
867         if (drbd_rs_del_all(device)) {
868                 /* In case this is not possible now, most probably because
869                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
870                  * queue (or even the read operations for those packets
871                  * is not finished by now).   Retry in 100ms. */
872
873                 schedule_timeout_interruptible(HZ / 10);
874                 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
875                 if (dw) {
876                         dw->w.cb = w_resync_finished;
877                         dw->device = device;
878                         drbd_queue_work(&connection->sender_work, &dw->w);
879                         return 1;
880                 }
881                 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
882         }
883
884         dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
885         if (dt <= 0)
886                 dt = 1;
887
888         db = device->rs_total;
889         /* adjust for verify start and stop sectors, respective reached position */
890         if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
891                 db -= device->ov_left;
892
893         dbdt = Bit2KB(db/dt);
894         device->rs_paused /= HZ;
895
896         if (!get_ldev(device))
897                 goto out;
898
899         ping_peer(device);
900
901         spin_lock_irq(&device->resource->req_lock);
902         os = drbd_read_state(device);
903
904         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
905
906         /* This protects us against multiple calls (that can happen in the presence
907            of application IO), and against connectivity loss just before we arrive here. */
908         if (os.conn <= C_CONNECTED)
909                 goto out_unlock;
910
911         ns = os;
912         ns.conn = C_CONNECTED;
913
914         drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
915              verify_done ? "Online verify" : "Resync",
916              dt + device->rs_paused, device->rs_paused, dbdt);
917
918         n_oos = drbd_bm_total_weight(device);
919
920         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
921                 if (n_oos) {
922                         drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
923                               n_oos, Bit2KB(1));
924                         khelper_cmd = "out-of-sync";
925                 }
926         } else {
927                 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
928
929                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
930                         khelper_cmd = "after-resync-target";
931
932                 if (device->use_csums && device->rs_total) {
933                         const unsigned long s = device->rs_same_csum;
934                         const unsigned long t = device->rs_total;
935                         const int ratio =
936                                 (t == 0)     ? 0 :
937                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
938                         drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
939                              "transferred %luK total %luK\n",
940                              ratio,
941                              Bit2KB(device->rs_same_csum),
942                              Bit2KB(device->rs_total - device->rs_same_csum),
943                              Bit2KB(device->rs_total));
944                 }
945         }
946
947         if (device->rs_failed) {
948                 drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
949
950                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
951                         ns.disk = D_INCONSISTENT;
952                         ns.pdsk = D_UP_TO_DATE;
953                 } else {
954                         ns.disk = D_UP_TO_DATE;
955                         ns.pdsk = D_INCONSISTENT;
956                 }
957         } else {
958                 ns.disk = D_UP_TO_DATE;
959                 ns.pdsk = D_UP_TO_DATE;
960
961                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
962                         if (device->p_uuid) {
963                                 int i;
964                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
965                                         _drbd_uuid_set(device, i, device->p_uuid[i]);
966                                 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
967                                 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
968                         } else {
969                                 drbd_err(device, "device->p_uuid is NULL! BUG\n");
970                         }
971                 }
972
973                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
974                         /* for verify runs, we don't update uuids here,
975                          * so there would be nothing to report. */
976                         drbd_uuid_set_bm(device, 0UL);
977                         drbd_print_uuids(device, "updated UUIDs");
978                         if (device->p_uuid) {
979                                 /* Now the two UUID sets are equal, update what we
980                                  * know of the peer. */
981                                 int i;
982                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
983                                         device->p_uuid[i] = device->ldev->md.uuid[i];
984                         }
985                 }
986         }
987
988         _drbd_set_state(device, ns, CS_VERBOSE, NULL);
989 out_unlock:
990         spin_unlock_irq(&device->resource->req_lock);
991
992         /* If we have been sync source, and have an effective fencing-policy,
993          * once *all* volumes are back in sync, call "unfence". */
994         if (os.conn == C_SYNC_SOURCE) {
995                 enum drbd_disk_state disk_state = D_MASK;
996                 enum drbd_disk_state pdsk_state = D_MASK;
997                 enum drbd_fencing_p fp = FP_DONT_CARE;
998
999                 rcu_read_lock();
1000                 fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1001                 if (fp != FP_DONT_CARE) {
1002                         struct drbd_peer_device *peer_device;
1003                         int vnr;
1004                         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1005                                 struct drbd_device *device = peer_device->device;
1006                                 disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1007                                 pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1008                         }
1009                 }
1010                 rcu_read_unlock();
1011                 if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1012                         conn_khelper(connection, "unfence-peer");
1013         }
1014
1015         put_ldev(device);
1016 out:
1017         device->rs_total  = 0;
1018         device->rs_failed = 0;
1019         device->rs_paused = 0;
1020
1021         /* reset start sector, if we reached end of device */
1022         if (verify_done && device->ov_left == 0)
1023                 device->ov_start_sector = 0;
1024
1025         drbd_md_sync(device);
1026
1027         if (khelper_cmd)
1028                 drbd_khelper(device, khelper_cmd);
1029
1030         return 1;
1031 }
1032
1033 /* helper */
1034 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1035 {
1036         if (drbd_peer_req_has_active_page(peer_req)) {
1037                 /* This might happen if sendpage() has not finished */
1038                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1039                 atomic_add(i, &device->pp_in_use_by_net);
1040                 atomic_sub(i, &device->pp_in_use);
1041                 spin_lock_irq(&device->resource->req_lock);
1042                 list_add_tail(&peer_req->w.list, &device->net_ee);
1043                 spin_unlock_irq(&device->resource->req_lock);
1044                 wake_up(&drbd_pp_wait);
1045         } else
1046                 drbd_free_peer_req(device, peer_req);
1047 }
1048
1049 /**
1050  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1051  * @w:          work object.
1052  * @cancel:     The connection will be closed anyways
1053  */
1054 int w_e_end_data_req(struct drbd_work *w, int cancel)
1055 {
1056         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1057         struct drbd_peer_device *peer_device = peer_req->peer_device;
1058         struct drbd_device *device = peer_device->device;
1059         int err;
1060
1061         if (unlikely(cancel)) {
1062                 drbd_free_peer_req(device, peer_req);
1063                 dec_unacked(device);
1064                 return 0;
1065         }
1066
1067         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1068                 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1069         } else {
1070                 if (__ratelimit(&drbd_ratelimit_state))
1071                         drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1072                             (unsigned long long)peer_req->i.sector);
1073
1074                 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1075         }
1076
1077         dec_unacked(device);
1078
1079         move_to_net_ee_or_free(device, peer_req);
1080
1081         if (unlikely(err))
1082                 drbd_err(device, "drbd_send_block() failed\n");
1083         return err;
1084 }
1085
1086 static bool all_zero(struct drbd_peer_request *peer_req)
1087 {
1088         struct page *page = peer_req->pages;
1089         unsigned int len = peer_req->i.size;
1090
1091         page_chain_for_each(page) {
1092                 unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1093                 unsigned int i, words = l / sizeof(long);
1094                 unsigned long *d;
1095
1096                 d = kmap_atomic(page);
1097                 for (i = 0; i < words; i++) {
1098                         if (d[i]) {
1099                                 kunmap_atomic(d);
1100                                 return false;
1101                         }
1102                 }
1103                 kunmap_atomic(d);
1104                 len -= l;
1105         }
1106
1107         return true;
1108 }
1109
1110 /**
1111  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1112  * @w:          work object.
1113  * @cancel:     The connection will be closed anyways
1114  */
1115 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1116 {
1117         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1118         struct drbd_peer_device *peer_device = peer_req->peer_device;
1119         struct drbd_device *device = peer_device->device;
1120         int err;
1121
1122         if (unlikely(cancel)) {
1123                 drbd_free_peer_req(device, peer_req);
1124                 dec_unacked(device);
1125                 return 0;
1126         }
1127
1128         if (get_ldev_if_state(device, D_FAILED)) {
1129                 drbd_rs_complete_io(device, peer_req->i.sector);
1130                 put_ldev(device);
1131         }
1132
1133         if (device->state.conn == C_AHEAD) {
1134                 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1135         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1136                 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1137                         inc_rs_pending(device);
1138                         if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1139                                 err = drbd_send_rs_deallocated(peer_device, peer_req);
1140                         else
1141                                 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1142                 } else {
1143                         if (__ratelimit(&drbd_ratelimit_state))
1144                                 drbd_err(device, "Not sending RSDataReply, "
1145                                     "partner DISKLESS!\n");
1146                         err = 0;
1147                 }
1148         } else {
1149                 if (__ratelimit(&drbd_ratelimit_state))
1150                         drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1151                             (unsigned long long)peer_req->i.sector);
1152
1153                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1154
1155                 /* update resync data with failure */
1156                 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1157         }
1158
1159         dec_unacked(device);
1160
1161         move_to_net_ee_or_free(device, peer_req);
1162
1163         if (unlikely(err))
1164                 drbd_err(device, "drbd_send_block() failed\n");
1165         return err;
1166 }
1167
1168 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1169 {
1170         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1171         struct drbd_peer_device *peer_device = peer_req->peer_device;
1172         struct drbd_device *device = peer_device->device;
1173         struct digest_info *di;
1174         int digest_size;
1175         void *digest = NULL;
1176         int err, eq = 0;
1177
1178         if (unlikely(cancel)) {
1179                 drbd_free_peer_req(device, peer_req);
1180                 dec_unacked(device);
1181                 return 0;
1182         }
1183
1184         if (get_ldev(device)) {
1185                 drbd_rs_complete_io(device, peer_req->i.sector);
1186                 put_ldev(device);
1187         }
1188
1189         di = peer_req->digest;
1190
1191         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1192                 /* quick hack to try to avoid a race against reconfiguration.
1193                  * a real fix would be much more involved,
1194                  * introducing more locking mechanisms */
1195                 if (peer_device->connection->csums_tfm) {
1196                         digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
1197                         D_ASSERT(device, digest_size == di->digest_size);
1198                         digest = kmalloc(digest_size, GFP_NOIO);
1199                 }
1200                 if (digest) {
1201                         drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1202                         eq = !memcmp(digest, di->digest, digest_size);
1203                         kfree(digest);
1204                 }
1205
1206                 if (eq) {
1207                         drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1208                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1209                         device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1210                         err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1211                 } else {
1212                         inc_rs_pending(device);
1213                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1214                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1215                         kfree(di);
1216                         err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1217                 }
1218         } else {
1219                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1220                 if (__ratelimit(&drbd_ratelimit_state))
1221                         drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1222         }
1223
1224         dec_unacked(device);
1225         move_to_net_ee_or_free(device, peer_req);
1226
1227         if (unlikely(err))
1228                 drbd_err(device, "drbd_send_block/ack() failed\n");
1229         return err;
1230 }
1231
1232 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1233 {
1234         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1235         struct drbd_peer_device *peer_device = peer_req->peer_device;
1236         struct drbd_device *device = peer_device->device;
1237         sector_t sector = peer_req->i.sector;
1238         unsigned int size = peer_req->i.size;
1239         int digest_size;
1240         void *digest;
1241         int err = 0;
1242
1243         if (unlikely(cancel))
1244                 goto out;
1245
1246         digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1247         digest = kmalloc(digest_size, GFP_NOIO);
1248         if (!digest) {
1249                 err = 1;        /* terminate the connection in case the allocation failed */
1250                 goto out;
1251         }
1252
1253         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1254                 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1255         else
1256                 memset(digest, 0, digest_size);
1257
1258         /* Free e and pages before send.
1259          * In case we block on congestion, we could otherwise run into
1260          * some distributed deadlock, if the other side blocks on
1261          * congestion as well, because our receiver blocks in
1262          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1263         drbd_free_peer_req(device, peer_req);
1264         peer_req = NULL;
1265         inc_rs_pending(device);
1266         err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1267         if (err)
1268                 dec_rs_pending(device);
1269         kfree(digest);
1270
1271 out:
1272         if (peer_req)
1273                 drbd_free_peer_req(device, peer_req);
1274         dec_unacked(device);
1275         return err;
1276 }
1277
1278 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1279 {
1280         if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1281                 device->ov_last_oos_size += size>>9;
1282         } else {
1283                 device->ov_last_oos_start = sector;
1284                 device->ov_last_oos_size = size>>9;
1285         }
1286         drbd_set_out_of_sync(device, sector, size);
1287 }
1288
1289 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1290 {
1291         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1292         struct drbd_peer_device *peer_device = peer_req->peer_device;
1293         struct drbd_device *device = peer_device->device;
1294         struct digest_info *di;
1295         void *digest;
1296         sector_t sector = peer_req->i.sector;
1297         unsigned int size = peer_req->i.size;
1298         int digest_size;
1299         int err, eq = 0;
1300         bool stop_sector_reached = false;
1301
1302         if (unlikely(cancel)) {
1303                 drbd_free_peer_req(device, peer_req);
1304                 dec_unacked(device);
1305                 return 0;
1306         }
1307
1308         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1309          * the resync lru has been cleaned up already */
1310         if (get_ldev(device)) {
1311                 drbd_rs_complete_io(device, peer_req->i.sector);
1312                 put_ldev(device);
1313         }
1314
1315         di = peer_req->digest;
1316
1317         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1318                 digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1319                 digest = kmalloc(digest_size, GFP_NOIO);
1320                 if (digest) {
1321                         drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1322
1323                         D_ASSERT(device, digest_size == di->digest_size);
1324                         eq = !memcmp(digest, di->digest, digest_size);
1325                         kfree(digest);
1326                 }
1327         }
1328
1329         /* Free peer_req and pages before send.
1330          * In case we block on congestion, we could otherwise run into
1331          * some distributed deadlock, if the other side blocks on
1332          * congestion as well, because our receiver blocks in
1333          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1334         drbd_free_peer_req(device, peer_req);
1335         if (!eq)
1336                 drbd_ov_out_of_sync_found(device, sector, size);
1337         else
1338                 ov_out_of_sync_print(device);
1339
1340         err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1341                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1342
1343         dec_unacked(device);
1344
1345         --device->ov_left;
1346
1347         /* let's advance progress step marks only for every other megabyte */
1348         if ((device->ov_left & 0x200) == 0x200)
1349                 drbd_advance_rs_marks(device, device->ov_left);
1350
1351         stop_sector_reached = verify_can_do_stop_sector(device) &&
1352                 (sector + (size>>9)) >= device->ov_stop_sector;
1353
1354         if (device->ov_left == 0 || stop_sector_reached) {
1355                 ov_out_of_sync_print(device);
1356                 drbd_resync_finished(device);
1357         }
1358
1359         return err;
1360 }
1361
1362 /* FIXME
1363  * We need to track the number of pending barrier acks,
1364  * and to be able to wait for them.
1365  * See also comment in drbd_adm_attach before drbd_suspend_io.
1366  */
1367 static int drbd_send_barrier(struct drbd_connection *connection)
1368 {
1369         struct p_barrier *p;
1370         struct drbd_socket *sock;
1371
1372         sock = &connection->data;
1373         p = conn_prepare_command(connection, sock);
1374         if (!p)
1375                 return -EIO;
1376         p->barrier = connection->send.current_epoch_nr;
1377         p->pad = 0;
1378         connection->send.current_epoch_writes = 0;
1379         connection->send.last_sent_barrier_jif = jiffies;
1380
1381         return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1382 }
1383
1384 static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1385 {
1386         struct drbd_socket *sock = &pd->connection->data;
1387         if (!drbd_prepare_command(pd, sock))
1388                 return -EIO;
1389         return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1390 }
1391
1392 int w_send_write_hint(struct drbd_work *w, int cancel)
1393 {
1394         struct drbd_device *device =
1395                 container_of(w, struct drbd_device, unplug_work);
1396
1397         if (cancel)
1398                 return 0;
1399         return pd_send_unplug_remote(first_peer_device(device));
1400 }
1401
1402 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1403 {
1404         if (!connection->send.seen_any_write_yet) {
1405                 connection->send.seen_any_write_yet = true;
1406                 connection->send.current_epoch_nr = epoch;
1407                 connection->send.current_epoch_writes = 0;
1408                 connection->send.last_sent_barrier_jif = jiffies;
1409         }
1410 }
1411
1412 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1413 {
1414         /* re-init if first write on this connection */
1415         if (!connection->send.seen_any_write_yet)
1416                 return;
1417         if (connection->send.current_epoch_nr != epoch) {
1418                 if (connection->send.current_epoch_writes)
1419                         drbd_send_barrier(connection);
1420                 connection->send.current_epoch_nr = epoch;
1421         }
1422 }
1423
1424 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1425 {
1426         struct drbd_request *req = container_of(w, struct drbd_request, w);
1427         struct drbd_device *device = req->device;
1428         struct drbd_peer_device *const peer_device = first_peer_device(device);
1429         struct drbd_connection *const connection = peer_device->connection;
1430         int err;
1431
1432         if (unlikely(cancel)) {
1433                 req_mod(req, SEND_CANCELED);
1434                 return 0;
1435         }
1436         req->pre_send_jif = jiffies;
1437
1438         /* this time, no connection->send.current_epoch_writes++;
1439          * If it was sent, it was the closing barrier for the last
1440          * replicated epoch, before we went into AHEAD mode.
1441          * No more barriers will be sent, until we leave AHEAD mode again. */
1442         maybe_send_barrier(connection, req->epoch);
1443
1444         err = drbd_send_out_of_sync(peer_device, req);
1445         req_mod(req, OOS_HANDED_TO_NETWORK);
1446
1447         return err;
1448 }
1449
1450 /**
1451  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1452  * @w:          work object.
1453  * @cancel:     The connection will be closed anyways
1454  */
1455 int w_send_dblock(struct drbd_work *w, int cancel)
1456 {
1457         struct drbd_request *req = container_of(w, struct drbd_request, w);
1458         struct drbd_device *device = req->device;
1459         struct drbd_peer_device *const peer_device = first_peer_device(device);
1460         struct drbd_connection *connection = peer_device->connection;
1461         bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1462         int err;
1463
1464         if (unlikely(cancel)) {
1465                 req_mod(req, SEND_CANCELED);
1466                 return 0;
1467         }
1468         req->pre_send_jif = jiffies;
1469
1470         re_init_if_first_write(connection, req->epoch);
1471         maybe_send_barrier(connection, req->epoch);
1472         connection->send.current_epoch_writes++;
1473
1474         err = drbd_send_dblock(peer_device, req);
1475         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1476
1477         if (do_send_unplug && !err)
1478                 pd_send_unplug_remote(peer_device);
1479
1480         return err;
1481 }
1482
1483 /**
1484  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1485  * @w:          work object.
1486  * @cancel:     The connection will be closed anyways
1487  */
1488 int w_send_read_req(struct drbd_work *w, int cancel)
1489 {
1490         struct drbd_request *req = container_of(w, struct drbd_request, w);
1491         struct drbd_device *device = req->device;
1492         struct drbd_peer_device *const peer_device = first_peer_device(device);
1493         struct drbd_connection *connection = peer_device->connection;
1494         bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1495         int err;
1496
1497         if (unlikely(cancel)) {
1498                 req_mod(req, SEND_CANCELED);
1499                 return 0;
1500         }
1501         req->pre_send_jif = jiffies;
1502
1503         /* Even read requests may close a write epoch,
1504          * if there was any yet. */
1505         maybe_send_barrier(connection, req->epoch);
1506
1507         err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1508                                  (unsigned long)req);
1509
1510         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1511
1512         if (do_send_unplug && !err)
1513                 pd_send_unplug_remote(peer_device);
1514
1515         return err;
1516 }
1517
1518 int w_restart_disk_io(struct drbd_work *w, int cancel)
1519 {
1520         struct drbd_request *req = container_of(w, struct drbd_request, w);
1521         struct drbd_device *device = req->device;
1522
1523         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1524                 drbd_al_begin_io(device, &req->i);
1525
1526         drbd_req_make_private_bio(req, req->master_bio);
1527         bio_set_dev(req->private_bio, device->ldev->backing_bdev);
1528         submit_bio_noacct(req->private_bio);
1529
1530         return 0;
1531 }
1532
1533 static int _drbd_may_sync_now(struct drbd_device *device)
1534 {
1535         struct drbd_device *odev = device;
1536         int resync_after;
1537
1538         while (1) {
1539                 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1540                         return 1;
1541                 rcu_read_lock();
1542                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1543                 rcu_read_unlock();
1544                 if (resync_after == -1)
1545                         return 1;
1546                 odev = minor_to_device(resync_after);
1547                 if (!odev)
1548                         return 1;
1549                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1550                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1551                     odev->state.aftr_isp || odev->state.peer_isp ||
1552                     odev->state.user_isp)
1553                         return 0;
1554         }
1555 }
1556
1557 /**
1558  * drbd_pause_after() - Pause resync on all devices that may not resync now
1559  * @device:     DRBD device.
1560  *
1561  * Called from process context only (admin command and after_state_ch).
1562  */
1563 static bool drbd_pause_after(struct drbd_device *device)
1564 {
1565         bool changed = false;
1566         struct drbd_device *odev;
1567         int i;
1568
1569         rcu_read_lock();
1570         idr_for_each_entry(&drbd_devices, odev, i) {
1571                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1572                         continue;
1573                 if (!_drbd_may_sync_now(odev) &&
1574                     _drbd_set_state(_NS(odev, aftr_isp, 1),
1575                                     CS_HARD, NULL) != SS_NOTHING_TO_DO)
1576                         changed = true;
1577         }
1578         rcu_read_unlock();
1579
1580         return changed;
1581 }
1582
1583 /**
1584  * drbd_resume_next() - Resume resync on all devices that may resync now
1585  * @device:     DRBD device.
1586  *
1587  * Called from process context only (admin command and worker).
1588  */
1589 static bool drbd_resume_next(struct drbd_device *device)
1590 {
1591         bool changed = false;
1592         struct drbd_device *odev;
1593         int i;
1594
1595         rcu_read_lock();
1596         idr_for_each_entry(&drbd_devices, odev, i) {
1597                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1598                         continue;
1599                 if (odev->state.aftr_isp) {
1600                         if (_drbd_may_sync_now(odev) &&
1601                             _drbd_set_state(_NS(odev, aftr_isp, 0),
1602                                             CS_HARD, NULL) != SS_NOTHING_TO_DO)
1603                                 changed = true;
1604                 }
1605         }
1606         rcu_read_unlock();
1607         return changed;
1608 }
1609
1610 void resume_next_sg(struct drbd_device *device)
1611 {
1612         lock_all_resources();
1613         drbd_resume_next(device);
1614         unlock_all_resources();
1615 }
1616
1617 void suspend_other_sg(struct drbd_device *device)
1618 {
1619         lock_all_resources();
1620         drbd_pause_after(device);
1621         unlock_all_resources();
1622 }
1623
1624 /* caller must lock_all_resources() */
1625 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1626 {
1627         struct drbd_device *odev;
1628         int resync_after;
1629
1630         if (o_minor == -1)
1631                 return NO_ERROR;
1632         if (o_minor < -1 || o_minor > MINORMASK)
1633                 return ERR_RESYNC_AFTER;
1634
1635         /* check for loops */
1636         odev = minor_to_device(o_minor);
1637         while (1) {
1638                 if (odev == device)
1639                         return ERR_RESYNC_AFTER_CYCLE;
1640
1641                 /* You are free to depend on diskless, non-existing,
1642                  * or not yet/no longer existing minors.
1643                  * We only reject dependency loops.
1644                  * We cannot follow the dependency chain beyond a detached or
1645                  * missing minor.
1646                  */
1647                 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1648                         return NO_ERROR;
1649
1650                 rcu_read_lock();
1651                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1652                 rcu_read_unlock();
1653                 /* dependency chain ends here, no cycles. */
1654                 if (resync_after == -1)
1655                         return NO_ERROR;
1656
1657                 /* follow the dependency chain */
1658                 odev = minor_to_device(resync_after);
1659         }
1660 }
1661
1662 /* caller must lock_all_resources() */
1663 void drbd_resync_after_changed(struct drbd_device *device)
1664 {
1665         int changed;
1666
1667         do {
1668                 changed  = drbd_pause_after(device);
1669                 changed |= drbd_resume_next(device);
1670         } while (changed);
1671 }
1672
1673 void drbd_rs_controller_reset(struct drbd_device *device)
1674 {
1675         struct gendisk *disk = device->ldev->backing_bdev->bd_disk;
1676         struct fifo_buffer *plan;
1677
1678         atomic_set(&device->rs_sect_in, 0);
1679         atomic_set(&device->rs_sect_ev, 0);
1680         device->rs_in_flight = 0;
1681         device->rs_last_events = (int)part_stat_read_accum(&disk->part0, sectors);
1682
1683         /* Updating the RCU protected object in place is necessary since
1684            this function gets called from atomic context.
1685            It is valid since all other updates also lead to an completely
1686            empty fifo */
1687         rcu_read_lock();
1688         plan = rcu_dereference(device->rs_plan_s);
1689         plan->total = 0;
1690         fifo_set(plan, 0);
1691         rcu_read_unlock();
1692 }
1693
1694 void start_resync_timer_fn(struct timer_list *t)
1695 {
1696         struct drbd_device *device = from_timer(device, t, start_resync_timer);
1697         drbd_device_post_work(device, RS_START);
1698 }
1699
1700 static void do_start_resync(struct drbd_device *device)
1701 {
1702         if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1703                 drbd_warn(device, "postponing start_resync ...\n");
1704                 device->start_resync_timer.expires = jiffies + HZ/10;
1705                 add_timer(&device->start_resync_timer);
1706                 return;
1707         }
1708
1709         drbd_start_resync(device, C_SYNC_SOURCE);
1710         clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1711 }
1712
1713 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1714 {
1715         bool csums_after_crash_only;
1716         rcu_read_lock();
1717         csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1718         rcu_read_unlock();
1719         return connection->agreed_pro_version >= 89 &&          /* supported? */
1720                 connection->csums_tfm &&                        /* configured? */
1721                 (csums_after_crash_only == false                /* use for each resync? */
1722                  || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1723 }
1724
1725 /**
1726  * drbd_start_resync() - Start the resync process
1727  * @device:     DRBD device.
1728  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1729  *
1730  * This function might bring you directly into one of the
1731  * C_PAUSED_SYNC_* states.
1732  */
1733 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1734 {
1735         struct drbd_peer_device *peer_device = first_peer_device(device);
1736         struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1737         union drbd_state ns;
1738         int r;
1739
1740         if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1741                 drbd_err(device, "Resync already running!\n");
1742                 return;
1743         }
1744
1745         if (!connection) {
1746                 drbd_err(device, "No connection to peer, aborting!\n");
1747                 return;
1748         }
1749
1750         if (!test_bit(B_RS_H_DONE, &device->flags)) {
1751                 if (side == C_SYNC_TARGET) {
1752                         /* Since application IO was locked out during C_WF_BITMAP_T and
1753                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1754                            we check that we might make the data inconsistent. */
1755                         r = drbd_khelper(device, "before-resync-target");
1756                         r = (r >> 8) & 0xff;
1757                         if (r > 0) {
1758                                 drbd_info(device, "before-resync-target handler returned %d, "
1759                                          "dropping connection.\n", r);
1760                                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1761                                 return;
1762                         }
1763                 } else /* C_SYNC_SOURCE */ {
1764                         r = drbd_khelper(device, "before-resync-source");
1765                         r = (r >> 8) & 0xff;
1766                         if (r > 0) {
1767                                 if (r == 3) {
1768                                         drbd_info(device, "before-resync-source handler returned %d, "
1769                                                  "ignoring. Old userland tools?", r);
1770                                 } else {
1771                                         drbd_info(device, "before-resync-source handler returned %d, "
1772                                                  "dropping connection.\n", r);
1773                                         conn_request_state(connection,
1774                                                            NS(conn, C_DISCONNECTING), CS_HARD);
1775                                         return;
1776                                 }
1777                         }
1778                 }
1779         }
1780
1781         if (current == connection->worker.task) {
1782                 /* The worker should not sleep waiting for state_mutex,
1783                    that can take long */
1784                 if (!mutex_trylock(device->state_mutex)) {
1785                         set_bit(B_RS_H_DONE, &device->flags);
1786                         device->start_resync_timer.expires = jiffies + HZ/5;
1787                         add_timer(&device->start_resync_timer);
1788                         return;
1789                 }
1790         } else {
1791                 mutex_lock(device->state_mutex);
1792         }
1793
1794         lock_all_resources();
1795         clear_bit(B_RS_H_DONE, &device->flags);
1796         /* Did some connection breakage or IO error race with us? */
1797         if (device->state.conn < C_CONNECTED
1798         || !get_ldev_if_state(device, D_NEGOTIATING)) {
1799                 unlock_all_resources();
1800                 goto out;
1801         }
1802
1803         ns = drbd_read_state(device);
1804
1805         ns.aftr_isp = !_drbd_may_sync_now(device);
1806
1807         ns.conn = side;
1808
1809         if (side == C_SYNC_TARGET)
1810                 ns.disk = D_INCONSISTENT;
1811         else /* side == C_SYNC_SOURCE */
1812                 ns.pdsk = D_INCONSISTENT;
1813
1814         r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1815         ns = drbd_read_state(device);
1816
1817         if (ns.conn < C_CONNECTED)
1818                 r = SS_UNKNOWN_ERROR;
1819
1820         if (r == SS_SUCCESS) {
1821                 unsigned long tw = drbd_bm_total_weight(device);
1822                 unsigned long now = jiffies;
1823                 int i;
1824
1825                 device->rs_failed    = 0;
1826                 device->rs_paused    = 0;
1827                 device->rs_same_csum = 0;
1828                 device->rs_last_sect_ev = 0;
1829                 device->rs_total     = tw;
1830                 device->rs_start     = now;
1831                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1832                         device->rs_mark_left[i] = tw;
1833                         device->rs_mark_time[i] = now;
1834                 }
1835                 drbd_pause_after(device);
1836                 /* Forget potentially stale cached per resync extent bit-counts.
1837                  * Open coded drbd_rs_cancel_all(device), we already have IRQs
1838                  * disabled, and know the disk state is ok. */
1839                 spin_lock(&device->al_lock);
1840                 lc_reset(device->resync);
1841                 device->resync_locked = 0;
1842                 device->resync_wenr = LC_FREE;
1843                 spin_unlock(&device->al_lock);
1844         }
1845         unlock_all_resources();
1846
1847         if (r == SS_SUCCESS) {
1848                 wake_up(&device->al_wait); /* for lc_reset() above */
1849                 /* reset rs_last_bcast when a resync or verify is started,
1850                  * to deal with potential jiffies wrap. */
1851                 device->rs_last_bcast = jiffies - HZ;
1852
1853                 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1854                      drbd_conn_str(ns.conn),
1855                      (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1856                      (unsigned long) device->rs_total);
1857                 if (side == C_SYNC_TARGET) {
1858                         device->bm_resync_fo = 0;
1859                         device->use_csums = use_checksum_based_resync(connection, device);
1860                 } else {
1861                         device->use_csums = false;
1862                 }
1863
1864                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1865                  * with w_send_oos, or the sync target will get confused as to
1866                  * how much bits to resync.  We cannot do that always, because for an
1867                  * empty resync and protocol < 95, we need to do it here, as we call
1868                  * drbd_resync_finished from here in that case.
1869                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1870                  * and from after_state_ch otherwise. */
1871                 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1872                         drbd_gen_and_send_sync_uuid(peer_device);
1873
1874                 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1875                         /* This still has a race (about when exactly the peers
1876                          * detect connection loss) that can lead to a full sync
1877                          * on next handshake. In 8.3.9 we fixed this with explicit
1878                          * resync-finished notifications, but the fix
1879                          * introduces a protocol change.  Sleeping for some
1880                          * time longer than the ping interval + timeout on the
1881                          * SyncSource, to give the SyncTarget the chance to
1882                          * detect connection loss, then waiting for a ping
1883                          * response (implicit in drbd_resync_finished) reduces
1884                          * the race considerably, but does not solve it. */
1885                         if (side == C_SYNC_SOURCE) {
1886                                 struct net_conf *nc;
1887                                 int timeo;
1888
1889                                 rcu_read_lock();
1890                                 nc = rcu_dereference(connection->net_conf);
1891                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1892                                 rcu_read_unlock();
1893                                 schedule_timeout_interruptible(timeo);
1894                         }
1895                         drbd_resync_finished(device);
1896                 }
1897
1898                 drbd_rs_controller_reset(device);
1899                 /* ns.conn may already be != device->state.conn,
1900                  * we may have been paused in between, or become paused until
1901                  * the timer triggers.
1902                  * No matter, that is handled in resync_timer_fn() */
1903                 if (ns.conn == C_SYNC_TARGET)
1904                         mod_timer(&device->resync_timer, jiffies);
1905
1906                 drbd_md_sync(device);
1907         }
1908         put_ldev(device);
1909 out:
1910         mutex_unlock(device->state_mutex);
1911 }
1912
1913 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1914 {
1915         struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1916         device->rs_last_bcast = jiffies;
1917
1918         if (!get_ldev(device))
1919                 return;
1920
1921         drbd_bm_write_lazy(device, 0);
1922         if (resync_done && is_sync_state(device->state.conn))
1923                 drbd_resync_finished(device);
1924
1925         drbd_bcast_event(device, &sib);
1926         /* update timestamp, in case it took a while to write out stuff */
1927         device->rs_last_bcast = jiffies;
1928         put_ldev(device);
1929 }
1930
1931 static void drbd_ldev_destroy(struct drbd_device *device)
1932 {
1933         lc_destroy(device->resync);
1934         device->resync = NULL;
1935         lc_destroy(device->act_log);
1936         device->act_log = NULL;
1937
1938         __acquire(local);
1939         drbd_backing_dev_free(device, device->ldev);
1940         device->ldev = NULL;
1941         __release(local);
1942
1943         clear_bit(GOING_DISKLESS, &device->flags);
1944         wake_up(&device->misc_wait);
1945 }
1946
1947 static void go_diskless(struct drbd_device *device)
1948 {
1949         D_ASSERT(device, device->state.disk == D_FAILED);
1950         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1951          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1952          * the protected members anymore, though, so once put_ldev reaches zero
1953          * again, it will be safe to free them. */
1954
1955         /* Try to write changed bitmap pages, read errors may have just
1956          * set some bits outside the area covered by the activity log.
1957          *
1958          * If we have an IO error during the bitmap writeout,
1959          * we will want a full sync next time, just in case.
1960          * (Do we want a specific meta data flag for this?)
1961          *
1962          * If that does not make it to stable storage either,
1963          * we cannot do anything about that anymore.
1964          *
1965          * We still need to check if both bitmap and ldev are present, we may
1966          * end up here after a failed attach, before ldev was even assigned.
1967          */
1968         if (device->bitmap && device->ldev) {
1969                 /* An interrupted resync or similar is allowed to recounts bits
1970                  * while we detach.
1971                  * Any modifications would not be expected anymore, though.
1972                  */
1973                 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1974                                         "detach", BM_LOCKED_TEST_ALLOWED)) {
1975                         if (test_bit(WAS_READ_ERROR, &device->flags)) {
1976                                 drbd_md_set_flag(device, MDF_FULL_SYNC);
1977                                 drbd_md_sync(device);
1978                         }
1979                 }
1980         }
1981
1982         drbd_force_state(device, NS(disk, D_DISKLESS));
1983 }
1984
1985 static int do_md_sync(struct drbd_device *device)
1986 {
1987         drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1988         drbd_md_sync(device);
1989         return 0;
1990 }
1991
1992 /* only called from drbd_worker thread, no locking */
1993 void __update_timing_details(
1994                 struct drbd_thread_timing_details *tdp,
1995                 unsigned int *cb_nr,
1996                 void *cb,
1997                 const char *fn, const unsigned int line)
1998 {
1999         unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
2000         struct drbd_thread_timing_details *td = tdp + i;
2001
2002         td->start_jif = jiffies;
2003         td->cb_addr = cb;
2004         td->caller_fn = fn;
2005         td->line = line;
2006         td->cb_nr = *cb_nr;
2007
2008         i = (i+1) % DRBD_THREAD_DETAILS_HIST;
2009         td = tdp + i;
2010         memset(td, 0, sizeof(*td));
2011
2012         ++(*cb_nr);
2013 }
2014
2015 static void do_device_work(struct drbd_device *device, const unsigned long todo)
2016 {
2017         if (test_bit(MD_SYNC, &todo))
2018                 do_md_sync(device);
2019         if (test_bit(RS_DONE, &todo) ||
2020             test_bit(RS_PROGRESS, &todo))
2021                 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2022         if (test_bit(GO_DISKLESS, &todo))
2023                 go_diskless(device);
2024         if (test_bit(DESTROY_DISK, &todo))
2025                 drbd_ldev_destroy(device);
2026         if (test_bit(RS_START, &todo))
2027                 do_start_resync(device);
2028 }
2029
2030 #define DRBD_DEVICE_WORK_MASK   \
2031         ((1UL << GO_DISKLESS)   \
2032         |(1UL << DESTROY_DISK)  \
2033         |(1UL << MD_SYNC)       \
2034         |(1UL << RS_START)      \
2035         |(1UL << RS_PROGRESS)   \
2036         |(1UL << RS_DONE)       \
2037         )
2038
2039 static unsigned long get_work_bits(unsigned long *flags)
2040 {
2041         unsigned long old, new;
2042         do {
2043                 old = *flags;
2044                 new = old & ~DRBD_DEVICE_WORK_MASK;
2045         } while (cmpxchg(flags, old, new) != old);
2046         return old & DRBD_DEVICE_WORK_MASK;
2047 }
2048
2049 static void do_unqueued_work(struct drbd_connection *connection)
2050 {
2051         struct drbd_peer_device *peer_device;
2052         int vnr;
2053
2054         rcu_read_lock();
2055         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2056                 struct drbd_device *device = peer_device->device;
2057                 unsigned long todo = get_work_bits(&device->flags);
2058                 if (!todo)
2059                         continue;
2060
2061                 kref_get(&device->kref);
2062                 rcu_read_unlock();
2063                 do_device_work(device, todo);
2064                 kref_put(&device->kref, drbd_destroy_device);
2065                 rcu_read_lock();
2066         }
2067         rcu_read_unlock();
2068 }
2069
2070 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2071 {
2072         spin_lock_irq(&queue->q_lock);
2073         list_splice_tail_init(&queue->q, work_list);
2074         spin_unlock_irq(&queue->q_lock);
2075         return !list_empty(work_list);
2076 }
2077
2078 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2079 {
2080         DEFINE_WAIT(wait);
2081         struct net_conf *nc;
2082         int uncork, cork;
2083
2084         dequeue_work_batch(&connection->sender_work, work_list);
2085         if (!list_empty(work_list))
2086                 return;
2087
2088         /* Still nothing to do?
2089          * Maybe we still need to close the current epoch,
2090          * even if no new requests are queued yet.
2091          *
2092          * Also, poke TCP, just in case.
2093          * Then wait for new work (or signal). */
2094         rcu_read_lock();
2095         nc = rcu_dereference(connection->net_conf);
2096         uncork = nc ? nc->tcp_cork : 0;
2097         rcu_read_unlock();
2098         if (uncork) {
2099                 mutex_lock(&connection->data.mutex);
2100                 if (connection->data.socket)
2101                         tcp_sock_set_cork(connection->data.socket->sk, false);
2102                 mutex_unlock(&connection->data.mutex);
2103         }
2104
2105         for (;;) {
2106                 int send_barrier;
2107                 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2108                 spin_lock_irq(&connection->resource->req_lock);
2109                 spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
2110                 if (!list_empty(&connection->sender_work.q))
2111                         list_splice_tail_init(&connection->sender_work.q, work_list);
2112                 spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
2113                 if (!list_empty(work_list) || signal_pending(current)) {
2114                         spin_unlock_irq(&connection->resource->req_lock);
2115                         break;
2116                 }
2117
2118                 /* We found nothing new to do, no to-be-communicated request,
2119                  * no other work item.  We may still need to close the last
2120                  * epoch.  Next incoming request epoch will be connection ->
2121                  * current transfer log epoch number.  If that is different
2122                  * from the epoch of the last request we communicated, it is
2123                  * safe to send the epoch separating barrier now.
2124                  */
2125                 send_barrier =
2126                         atomic_read(&connection->current_tle_nr) !=
2127                         connection->send.current_epoch_nr;
2128                 spin_unlock_irq(&connection->resource->req_lock);
2129
2130                 if (send_barrier)
2131                         maybe_send_barrier(connection,
2132                                         connection->send.current_epoch_nr + 1);
2133
2134                 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2135                         break;
2136
2137                 /* drbd_send() may have called flush_signals() */
2138                 if (get_t_state(&connection->worker) != RUNNING)
2139                         break;
2140
2141                 schedule();
2142                 /* may be woken up for other things but new work, too,
2143                  * e.g. if the current epoch got closed.
2144                  * In which case we send the barrier above. */
2145         }
2146         finish_wait(&connection->sender_work.q_wait, &wait);
2147
2148         /* someone may have changed the config while we have been waiting above. */
2149         rcu_read_lock();
2150         nc = rcu_dereference(connection->net_conf);
2151         cork = nc ? nc->tcp_cork : 0;
2152         rcu_read_unlock();
2153         mutex_lock(&connection->data.mutex);
2154         if (connection->data.socket) {
2155                 if (cork)
2156                         tcp_sock_set_cork(connection->data.socket->sk, true);
2157                 else if (!uncork)
2158                         tcp_sock_set_cork(connection->data.socket->sk, false);
2159         }
2160         mutex_unlock(&connection->data.mutex);
2161 }
2162
2163 int drbd_worker(struct drbd_thread *thi)
2164 {
2165         struct drbd_connection *connection = thi->connection;
2166         struct drbd_work *w = NULL;
2167         struct drbd_peer_device *peer_device;
2168         LIST_HEAD(work_list);
2169         int vnr;
2170
2171         while (get_t_state(thi) == RUNNING) {
2172                 drbd_thread_current_set_cpu(thi);
2173
2174                 if (list_empty(&work_list)) {
2175                         update_worker_timing_details(connection, wait_for_work);
2176                         wait_for_work(connection, &work_list);
2177                 }
2178
2179                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2180                         update_worker_timing_details(connection, do_unqueued_work);
2181                         do_unqueued_work(connection);
2182                 }
2183
2184                 if (signal_pending(current)) {
2185                         flush_signals(current);
2186                         if (get_t_state(thi) == RUNNING) {
2187                                 drbd_warn(connection, "Worker got an unexpected signal\n");
2188                                 continue;
2189                         }
2190                         break;
2191                 }
2192
2193                 if (get_t_state(thi) != RUNNING)
2194                         break;
2195
2196                 if (!list_empty(&work_list)) {
2197                         w = list_first_entry(&work_list, struct drbd_work, list);
2198                         list_del_init(&w->list);
2199                         update_worker_timing_details(connection, w->cb);
2200                         if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2201                                 continue;
2202                         if (connection->cstate >= C_WF_REPORT_PARAMS)
2203                                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2204                 }
2205         }
2206
2207         do {
2208                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2209                         update_worker_timing_details(connection, do_unqueued_work);
2210                         do_unqueued_work(connection);
2211                 }
2212                 if (!list_empty(&work_list)) {
2213                         w = list_first_entry(&work_list, struct drbd_work, list);
2214                         list_del_init(&w->list);
2215                         update_worker_timing_details(connection, w->cb);
2216                         w->cb(w, 1);
2217                 } else
2218                         dequeue_work_batch(&connection->sender_work, &work_list);
2219         } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2220
2221         rcu_read_lock();
2222         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2223                 struct drbd_device *device = peer_device->device;
2224                 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2225                 kref_get(&device->kref);
2226                 rcu_read_unlock();
2227                 drbd_device_cleanup(device);
2228                 kref_put(&device->kref, drbd_destroy_device);
2229                 rcu_read_lock();
2230         }
2231         rcu_read_unlock();
2232
2233         return 0;
2234 }