GNU Linux-libre 5.19-rc6-gnu
[releases.git] / drivers / block / drbd / drbd_worker.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3    drbd_worker.c
4
5    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6
7    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
8    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
9    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10
11
12 */
13
14 #include <linux/module.h>
15 #include <linux/drbd.h>
16 #include <linux/sched/signal.h>
17 #include <linux/wait.h>
18 #include <linux/mm.h>
19 #include <linux/memcontrol.h>
20 #include <linux/mm_inline.h>
21 #include <linux/slab.h>
22 #include <linux/random.h>
23 #include <linux/string.h>
24 #include <linux/scatterlist.h>
25 #include <linux/part_stat.h>
26
27 #include "drbd_int.h"
28 #include "drbd_protocol.h"
29 #include "drbd_req.h"
30
31 static int make_ov_request(struct drbd_device *, int);
32 static int make_resync_request(struct drbd_device *, int);
33
34 /* endio handlers:
35  *   drbd_md_endio (defined here)
36  *   drbd_request_endio (defined here)
37  *   drbd_peer_request_endio (defined here)
38  *   drbd_bm_endio (defined in drbd_bitmap.c)
39  *
40  * For all these callbacks, note the following:
41  * The callbacks will be called in irq context by the IDE drivers,
42  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
43  * Try to get the locking right :)
44  *
45  */
46
47 /* used for synchronous meta data and bitmap IO
48  * submitted by drbd_md_sync_page_io()
49  */
50 void drbd_md_endio(struct bio *bio)
51 {
52         struct drbd_device *device;
53
54         device = bio->bi_private;
55         device->md_io.error = blk_status_to_errno(bio->bi_status);
56
57         /* special case: drbd_md_read() during drbd_adm_attach() */
58         if (device->ldev)
59                 put_ldev(device);
60         bio_put(bio);
61
62         /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
63          * to timeout on the lower level device, and eventually detach from it.
64          * If this io completion runs after that timeout expired, this
65          * drbd_md_put_buffer() may allow us to finally try and re-attach.
66          * During normal operation, this only puts that extra reference
67          * down to 1 again.
68          * Make sure we first drop the reference, and only then signal
69          * completion, or we may (in drbd_al_read_log()) cycle so fast into the
70          * next drbd_md_sync_page_io(), that we trigger the
71          * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
72          */
73         drbd_md_put_buffer(device);
74         device->md_io.done = 1;
75         wake_up(&device->misc_wait);
76 }
77
78 /* reads on behalf of the partner,
79  * "submitted" by the receiver
80  */
81 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
82 {
83         unsigned long flags = 0;
84         struct drbd_peer_device *peer_device = peer_req->peer_device;
85         struct drbd_device *device = peer_device->device;
86
87         spin_lock_irqsave(&device->resource->req_lock, flags);
88         device->read_cnt += peer_req->i.size >> 9;
89         list_del(&peer_req->w.list);
90         if (list_empty(&device->read_ee))
91                 wake_up(&device->ee_wait);
92         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
93                 __drbd_chk_io_error(device, DRBD_READ_ERROR);
94         spin_unlock_irqrestore(&device->resource->req_lock, flags);
95
96         drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
97         put_ldev(device);
98 }
99
100 /* writes on behalf of the partner, or resync writes,
101  * "submitted" by the receiver, final stage.  */
102 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
103 {
104         unsigned long flags = 0;
105         struct drbd_peer_device *peer_device = peer_req->peer_device;
106         struct drbd_device *device = peer_device->device;
107         struct drbd_connection *connection = peer_device->connection;
108         struct drbd_interval i;
109         int do_wake;
110         u64 block_id;
111         int do_al_complete_io;
112
113         /* after we moved peer_req to done_ee,
114          * we may no longer access it,
115          * it may be freed/reused already!
116          * (as soon as we release the req_lock) */
117         i = peer_req->i;
118         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
119         block_id = peer_req->block_id;
120         peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
121
122         if (peer_req->flags & EE_WAS_ERROR) {
123                 /* In protocol != C, we usually do not send write acks.
124                  * In case of a write error, send the neg ack anyways. */
125                 if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
126                         inc_unacked(device);
127                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
128         }
129
130         spin_lock_irqsave(&device->resource->req_lock, flags);
131         device->writ_cnt += peer_req->i.size >> 9;
132         list_move_tail(&peer_req->w.list, &device->done_ee);
133
134         /*
135          * Do not remove from the write_requests tree here: we did not send the
136          * Ack yet and did not wake possibly waiting conflicting requests.
137          * Removed from the tree from "drbd_process_done_ee" within the
138          * appropriate dw.cb (e_end_block/e_end_resync_block) or from
139          * _drbd_clear_done_ee.
140          */
141
142         do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
143
144         /* FIXME do we want to detach for failed REQ_OP_DISCARD?
145          * ((peer_req->flags & (EE_WAS_ERROR|EE_TRIM)) == EE_WAS_ERROR) */
146         if (peer_req->flags & EE_WAS_ERROR)
147                 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
148
149         if (connection->cstate >= C_WF_REPORT_PARAMS) {
150                 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
151                 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
152                         kref_put(&device->kref, drbd_destroy_device);
153         }
154         spin_unlock_irqrestore(&device->resource->req_lock, flags);
155
156         if (block_id == ID_SYNCER)
157                 drbd_rs_complete_io(device, i.sector);
158
159         if (do_wake)
160                 wake_up(&device->ee_wait);
161
162         if (do_al_complete_io)
163                 drbd_al_complete_io(device, &i);
164
165         put_ldev(device);
166 }
167
168 /* writes on behalf of the partner, or resync writes,
169  * "submitted" by the receiver.
170  */
171 void drbd_peer_request_endio(struct bio *bio)
172 {
173         struct drbd_peer_request *peer_req = bio->bi_private;
174         struct drbd_device *device = peer_req->peer_device->device;
175         bool is_write = bio_data_dir(bio) == WRITE;
176         bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
177                           bio_op(bio) == REQ_OP_DISCARD;
178
179         if (bio->bi_status && __ratelimit(&drbd_ratelimit_state))
180                 drbd_warn(device, "%s: error=%d s=%llus\n",
181                                 is_write ? (is_discard ? "discard" : "write")
182                                         : "read", bio->bi_status,
183                                 (unsigned long long)peer_req->i.sector);
184
185         if (bio->bi_status)
186                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
187
188         bio_put(bio); /* no need for the bio anymore */
189         if (atomic_dec_and_test(&peer_req->pending_bios)) {
190                 if (is_write)
191                         drbd_endio_write_sec_final(peer_req);
192                 else
193                         drbd_endio_read_sec_final(peer_req);
194         }
195 }
196
197 static void
198 drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
199 {
200         panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
201                 device->minor, device->resource->name, device->vnr);
202 }
203
204 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
205  */
206 void drbd_request_endio(struct bio *bio)
207 {
208         unsigned long flags;
209         struct drbd_request *req = bio->bi_private;
210         struct drbd_device *device = req->device;
211         struct bio_and_error m;
212         enum drbd_req_event what;
213
214         /* If this request was aborted locally before,
215          * but now was completed "successfully",
216          * chances are that this caused arbitrary data corruption.
217          *
218          * "aborting" requests, or force-detaching the disk, is intended for
219          * completely blocked/hung local backing devices which do no longer
220          * complete requests at all, not even do error completions.  In this
221          * situation, usually a hard-reset and failover is the only way out.
222          *
223          * By "aborting", basically faking a local error-completion,
224          * we allow for a more graceful swichover by cleanly migrating services.
225          * Still the affected node has to be rebooted "soon".
226          *
227          * By completing these requests, we allow the upper layers to re-use
228          * the associated data pages.
229          *
230          * If later the local backing device "recovers", and now DMAs some data
231          * from disk into the original request pages, in the best case it will
232          * just put random data into unused pages; but typically it will corrupt
233          * meanwhile completely unrelated data, causing all sorts of damage.
234          *
235          * Which means delayed successful completion,
236          * especially for READ requests,
237          * is a reason to panic().
238          *
239          * We assume that a delayed *error* completion is OK,
240          * though we still will complain noisily about it.
241          */
242         if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
243                 if (__ratelimit(&drbd_ratelimit_state))
244                         drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
245
246                 if (!bio->bi_status)
247                         drbd_panic_after_delayed_completion_of_aborted_request(device);
248         }
249
250         /* to avoid recursion in __req_mod */
251         if (unlikely(bio->bi_status)) {
252                 switch (bio_op(bio)) {
253                 case REQ_OP_WRITE_ZEROES:
254                 case REQ_OP_DISCARD:
255                         if (bio->bi_status == BLK_STS_NOTSUPP)
256                                 what = DISCARD_COMPLETED_NOTSUPP;
257                         else
258                                 what = DISCARD_COMPLETED_WITH_ERROR;
259                         break;
260                 case REQ_OP_READ:
261                         if (bio->bi_opf & REQ_RAHEAD)
262                                 what = READ_AHEAD_COMPLETED_WITH_ERROR;
263                         else
264                                 what = READ_COMPLETED_WITH_ERROR;
265                         break;
266                 default:
267                         what = WRITE_COMPLETED_WITH_ERROR;
268                         break;
269                 }
270         } else {
271                 what = COMPLETED_OK;
272         }
273
274         req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
275         bio_put(bio);
276
277         /* not req_mod(), we need irqsave here! */
278         spin_lock_irqsave(&device->resource->req_lock, flags);
279         __req_mod(req, what, &m);
280         spin_unlock_irqrestore(&device->resource->req_lock, flags);
281         put_ldev(device);
282
283         if (m.bio)
284                 complete_master_bio(device, &m);
285 }
286
287 void drbd_csum_ee(struct crypto_shash *tfm, struct drbd_peer_request *peer_req, void *digest)
288 {
289         SHASH_DESC_ON_STACK(desc, tfm);
290         struct page *page = peer_req->pages;
291         struct page *tmp;
292         unsigned len;
293         void *src;
294
295         desc->tfm = tfm;
296
297         crypto_shash_init(desc);
298
299         src = kmap_atomic(page);
300         while ((tmp = page_chain_next(page))) {
301                 /* all but the last page will be fully used */
302                 crypto_shash_update(desc, src, PAGE_SIZE);
303                 kunmap_atomic(src);
304                 page = tmp;
305                 src = kmap_atomic(page);
306         }
307         /* and now the last, possibly only partially used page */
308         len = peer_req->i.size & (PAGE_SIZE - 1);
309         crypto_shash_update(desc, src, len ?: PAGE_SIZE);
310         kunmap_atomic(src);
311
312         crypto_shash_final(desc, digest);
313         shash_desc_zero(desc);
314 }
315
316 void drbd_csum_bio(struct crypto_shash *tfm, struct bio *bio, void *digest)
317 {
318         SHASH_DESC_ON_STACK(desc, tfm);
319         struct bio_vec bvec;
320         struct bvec_iter iter;
321
322         desc->tfm = tfm;
323
324         crypto_shash_init(desc);
325
326         bio_for_each_segment(bvec, bio, iter) {
327                 u8 *src;
328
329                 src = bvec_kmap_local(&bvec);
330                 crypto_shash_update(desc, src, bvec.bv_len);
331                 kunmap_local(src);
332         }
333         crypto_shash_final(desc, digest);
334         shash_desc_zero(desc);
335 }
336
337 /* MAYBE merge common code with w_e_end_ov_req */
338 static int w_e_send_csum(struct drbd_work *w, int cancel)
339 {
340         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
341         struct drbd_peer_device *peer_device = peer_req->peer_device;
342         struct drbd_device *device = peer_device->device;
343         int digest_size;
344         void *digest;
345         int err = 0;
346
347         if (unlikely(cancel))
348                 goto out;
349
350         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
351                 goto out;
352
353         digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
354         digest = kmalloc(digest_size, GFP_NOIO);
355         if (digest) {
356                 sector_t sector = peer_req->i.sector;
357                 unsigned int size = peer_req->i.size;
358                 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
359                 /* Free peer_req and pages before send.
360                  * In case we block on congestion, we could otherwise run into
361                  * some distributed deadlock, if the other side blocks on
362                  * congestion as well, because our receiver blocks in
363                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
364                 drbd_free_peer_req(device, peer_req);
365                 peer_req = NULL;
366                 inc_rs_pending(device);
367                 err = drbd_send_drequest_csum(peer_device, sector, size,
368                                               digest, digest_size,
369                                               P_CSUM_RS_REQUEST);
370                 kfree(digest);
371         } else {
372                 drbd_err(device, "kmalloc() of digest failed.\n");
373                 err = -ENOMEM;
374         }
375
376 out:
377         if (peer_req)
378                 drbd_free_peer_req(device, peer_req);
379
380         if (unlikely(err))
381                 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
382         return err;
383 }
384
385 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
386
387 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
388 {
389         struct drbd_device *device = peer_device->device;
390         struct drbd_peer_request *peer_req;
391
392         if (!get_ldev(device))
393                 return -EIO;
394
395         /* GFP_TRY, because if there is no memory available right now, this may
396          * be rescheduled for later. It is "only" background resync, after all. */
397         peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
398                                        size, size, GFP_TRY);
399         if (!peer_req)
400                 goto defer;
401
402         peer_req->w.cb = w_e_send_csum;
403         spin_lock_irq(&device->resource->req_lock);
404         list_add_tail(&peer_req->w.list, &device->read_ee);
405         spin_unlock_irq(&device->resource->req_lock);
406
407         atomic_add(size >> 9, &device->rs_sect_ev);
408         if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
409                                      DRBD_FAULT_RS_RD) == 0)
410                 return 0;
411
412         /* If it failed because of ENOMEM, retry should help.  If it failed
413          * because bio_add_page failed (probably broken lower level driver),
414          * retry may or may not help.
415          * If it does not, you may need to force disconnect. */
416         spin_lock_irq(&device->resource->req_lock);
417         list_del(&peer_req->w.list);
418         spin_unlock_irq(&device->resource->req_lock);
419
420         drbd_free_peer_req(device, peer_req);
421 defer:
422         put_ldev(device);
423         return -EAGAIN;
424 }
425
426 int w_resync_timer(struct drbd_work *w, int cancel)
427 {
428         struct drbd_device *device =
429                 container_of(w, struct drbd_device, resync_work);
430
431         switch (device->state.conn) {
432         case C_VERIFY_S:
433                 make_ov_request(device, cancel);
434                 break;
435         case C_SYNC_TARGET:
436                 make_resync_request(device, cancel);
437                 break;
438         }
439
440         return 0;
441 }
442
443 void resync_timer_fn(struct timer_list *t)
444 {
445         struct drbd_device *device = from_timer(device, t, resync_timer);
446
447         drbd_queue_work_if_unqueued(
448                 &first_peer_device(device)->connection->sender_work,
449                 &device->resync_work);
450 }
451
452 static void fifo_set(struct fifo_buffer *fb, int value)
453 {
454         int i;
455
456         for (i = 0; i < fb->size; i++)
457                 fb->values[i] = value;
458 }
459
460 static int fifo_push(struct fifo_buffer *fb, int value)
461 {
462         int ov;
463
464         ov = fb->values[fb->head_index];
465         fb->values[fb->head_index++] = value;
466
467         if (fb->head_index >= fb->size)
468                 fb->head_index = 0;
469
470         return ov;
471 }
472
473 static void fifo_add_val(struct fifo_buffer *fb, int value)
474 {
475         int i;
476
477         for (i = 0; i < fb->size; i++)
478                 fb->values[i] += value;
479 }
480
481 struct fifo_buffer *fifo_alloc(unsigned int fifo_size)
482 {
483         struct fifo_buffer *fb;
484
485         fb = kzalloc(struct_size(fb, values, fifo_size), GFP_NOIO);
486         if (!fb)
487                 return NULL;
488
489         fb->head_index = 0;
490         fb->size = fifo_size;
491         fb->total = 0;
492
493         return fb;
494 }
495
496 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
497 {
498         struct disk_conf *dc;
499         unsigned int want;     /* The number of sectors we want in-flight */
500         int req_sect; /* Number of sectors to request in this turn */
501         int correction; /* Number of sectors more we need in-flight */
502         int cps; /* correction per invocation of drbd_rs_controller() */
503         int steps; /* Number of time steps to plan ahead */
504         int curr_corr;
505         int max_sect;
506         struct fifo_buffer *plan;
507
508         dc = rcu_dereference(device->ldev->disk_conf);
509         plan = rcu_dereference(device->rs_plan_s);
510
511         steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
512
513         if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
514                 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
515         } else { /* normal path */
516                 want = dc->c_fill_target ? dc->c_fill_target :
517                         sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
518         }
519
520         correction = want - device->rs_in_flight - plan->total;
521
522         /* Plan ahead */
523         cps = correction / steps;
524         fifo_add_val(plan, cps);
525         plan->total += cps * steps;
526
527         /* What we do in this step */
528         curr_corr = fifo_push(plan, 0);
529         plan->total -= curr_corr;
530
531         req_sect = sect_in + curr_corr;
532         if (req_sect < 0)
533                 req_sect = 0;
534
535         max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
536         if (req_sect > max_sect)
537                 req_sect = max_sect;
538
539         /*
540         drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
541                  sect_in, device->rs_in_flight, want, correction,
542                  steps, cps, device->rs_planed, curr_corr, req_sect);
543         */
544
545         return req_sect;
546 }
547
548 static int drbd_rs_number_requests(struct drbd_device *device)
549 {
550         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
551         int number, mxb;
552
553         sect_in = atomic_xchg(&device->rs_sect_in, 0);
554         device->rs_in_flight -= sect_in;
555
556         rcu_read_lock();
557         mxb = drbd_get_max_buffers(device) / 2;
558         if (rcu_dereference(device->rs_plan_s)->size) {
559                 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
560                 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
561         } else {
562                 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
563                 number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
564         }
565         rcu_read_unlock();
566
567         /* Don't have more than "max-buffers"/2 in-flight.
568          * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
569          * potentially causing a distributed deadlock on congestion during
570          * online-verify or (checksum-based) resync, if max-buffers,
571          * socket buffer sizes and resync rate settings are mis-configured. */
572
573         /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
574          * mxb (as used here, and in drbd_alloc_pages on the peer) is
575          * "number of pages" (typically also 4k),
576          * but "rs_in_flight" is in "sectors" (512 Byte). */
577         if (mxb - device->rs_in_flight/8 < number)
578                 number = mxb - device->rs_in_flight/8;
579
580         return number;
581 }
582
583 static int make_resync_request(struct drbd_device *const device, int cancel)
584 {
585         struct drbd_peer_device *const peer_device = first_peer_device(device);
586         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
587         unsigned long bit;
588         sector_t sector;
589         const sector_t capacity = get_capacity(device->vdisk);
590         int max_bio_size;
591         int number, rollback_i, size;
592         int align, requeue = 0;
593         int i = 0;
594         int discard_granularity = 0;
595
596         if (unlikely(cancel))
597                 return 0;
598
599         if (device->rs_total == 0) {
600                 /* empty resync? */
601                 drbd_resync_finished(device);
602                 return 0;
603         }
604
605         if (!get_ldev(device)) {
606                 /* Since we only need to access device->rsync a
607                    get_ldev_if_state(device,D_FAILED) would be sufficient, but
608                    to continue resync with a broken disk makes no sense at
609                    all */
610                 drbd_err(device, "Disk broke down during resync!\n");
611                 return 0;
612         }
613
614         if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
615                 rcu_read_lock();
616                 discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
617                 rcu_read_unlock();
618         }
619
620         max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
621         number = drbd_rs_number_requests(device);
622         if (number <= 0)
623                 goto requeue;
624
625         for (i = 0; i < number; i++) {
626                 /* Stop generating RS requests when half of the send buffer is filled,
627                  * but notify TCP that we'd like to have more space. */
628                 mutex_lock(&connection->data.mutex);
629                 if (connection->data.socket) {
630                         struct sock *sk = connection->data.socket->sk;
631                         int queued = sk->sk_wmem_queued;
632                         int sndbuf = sk->sk_sndbuf;
633                         if (queued > sndbuf / 2) {
634                                 requeue = 1;
635                                 if (sk->sk_socket)
636                                         set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
637                         }
638                 } else
639                         requeue = 1;
640                 mutex_unlock(&connection->data.mutex);
641                 if (requeue)
642                         goto requeue;
643
644 next_sector:
645                 size = BM_BLOCK_SIZE;
646                 bit  = drbd_bm_find_next(device, device->bm_resync_fo);
647
648                 if (bit == DRBD_END_OF_BITMAP) {
649                         device->bm_resync_fo = drbd_bm_bits(device);
650                         put_ldev(device);
651                         return 0;
652                 }
653
654                 sector = BM_BIT_TO_SECT(bit);
655
656                 if (drbd_try_rs_begin_io(device, sector)) {
657                         device->bm_resync_fo = bit;
658                         goto requeue;
659                 }
660                 device->bm_resync_fo = bit + 1;
661
662                 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
663                         drbd_rs_complete_io(device, sector);
664                         goto next_sector;
665                 }
666
667 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
668                 /* try to find some adjacent bits.
669                  * we stop if we have already the maximum req size.
670                  *
671                  * Additionally always align bigger requests, in order to
672                  * be prepared for all stripe sizes of software RAIDs.
673                  */
674                 align = 1;
675                 rollback_i = i;
676                 while (i < number) {
677                         if (size + BM_BLOCK_SIZE > max_bio_size)
678                                 break;
679
680                         /* Be always aligned */
681                         if (sector & ((1<<(align+3))-1))
682                                 break;
683
684                         if (discard_granularity && size == discard_granularity)
685                                 break;
686
687                         /* do not cross extent boundaries */
688                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
689                                 break;
690                         /* now, is it actually dirty, after all?
691                          * caution, drbd_bm_test_bit is tri-state for some
692                          * obscure reason; ( b == 0 ) would get the out-of-band
693                          * only accidentally right because of the "oddly sized"
694                          * adjustment below */
695                         if (drbd_bm_test_bit(device, bit+1) != 1)
696                                 break;
697                         bit++;
698                         size += BM_BLOCK_SIZE;
699                         if ((BM_BLOCK_SIZE << align) <= size)
700                                 align++;
701                         i++;
702                 }
703                 /* if we merged some,
704                  * reset the offset to start the next drbd_bm_find_next from */
705                 if (size > BM_BLOCK_SIZE)
706                         device->bm_resync_fo = bit + 1;
707 #endif
708
709                 /* adjust very last sectors, in case we are oddly sized */
710                 if (sector + (size>>9) > capacity)
711                         size = (capacity-sector)<<9;
712
713                 if (device->use_csums) {
714                         switch (read_for_csum(peer_device, sector, size)) {
715                         case -EIO: /* Disk failure */
716                                 put_ldev(device);
717                                 return -EIO;
718                         case -EAGAIN: /* allocation failed, or ldev busy */
719                                 drbd_rs_complete_io(device, sector);
720                                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
721                                 i = rollback_i;
722                                 goto requeue;
723                         case 0:
724                                 /* everything ok */
725                                 break;
726                         default:
727                                 BUG();
728                         }
729                 } else {
730                         int err;
731
732                         inc_rs_pending(device);
733                         err = drbd_send_drequest(peer_device,
734                                                  size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
735                                                  sector, size, ID_SYNCER);
736                         if (err) {
737                                 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
738                                 dec_rs_pending(device);
739                                 put_ldev(device);
740                                 return err;
741                         }
742                 }
743         }
744
745         if (device->bm_resync_fo >= drbd_bm_bits(device)) {
746                 /* last syncer _request_ was sent,
747                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
748                  * next sync group will resume), as soon as we receive the last
749                  * resync data block, and the last bit is cleared.
750                  * until then resync "work" is "inactive" ...
751                  */
752                 put_ldev(device);
753                 return 0;
754         }
755
756  requeue:
757         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
758         mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
759         put_ldev(device);
760         return 0;
761 }
762
763 static int make_ov_request(struct drbd_device *device, int cancel)
764 {
765         int number, i, size;
766         sector_t sector;
767         const sector_t capacity = get_capacity(device->vdisk);
768         bool stop_sector_reached = false;
769
770         if (unlikely(cancel))
771                 return 1;
772
773         number = drbd_rs_number_requests(device);
774
775         sector = device->ov_position;
776         for (i = 0; i < number; i++) {
777                 if (sector >= capacity)
778                         return 1;
779
780                 /* We check for "finished" only in the reply path:
781                  * w_e_end_ov_reply().
782                  * We need to send at least one request out. */
783                 stop_sector_reached = i > 0
784                         && verify_can_do_stop_sector(device)
785                         && sector >= device->ov_stop_sector;
786                 if (stop_sector_reached)
787                         break;
788
789                 size = BM_BLOCK_SIZE;
790
791                 if (drbd_try_rs_begin_io(device, sector)) {
792                         device->ov_position = sector;
793                         goto requeue;
794                 }
795
796                 if (sector + (size>>9) > capacity)
797                         size = (capacity-sector)<<9;
798
799                 inc_rs_pending(device);
800                 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
801                         dec_rs_pending(device);
802                         return 0;
803                 }
804                 sector += BM_SECT_PER_BIT;
805         }
806         device->ov_position = sector;
807
808  requeue:
809         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
810         if (i == 0 || !stop_sector_reached)
811                 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
812         return 1;
813 }
814
815 int w_ov_finished(struct drbd_work *w, int cancel)
816 {
817         struct drbd_device_work *dw =
818                 container_of(w, struct drbd_device_work, w);
819         struct drbd_device *device = dw->device;
820         kfree(dw);
821         ov_out_of_sync_print(device);
822         drbd_resync_finished(device);
823
824         return 0;
825 }
826
827 static int w_resync_finished(struct drbd_work *w, int cancel)
828 {
829         struct drbd_device_work *dw =
830                 container_of(w, struct drbd_device_work, w);
831         struct drbd_device *device = dw->device;
832         kfree(dw);
833
834         drbd_resync_finished(device);
835
836         return 0;
837 }
838
839 static void ping_peer(struct drbd_device *device)
840 {
841         struct drbd_connection *connection = first_peer_device(device)->connection;
842
843         clear_bit(GOT_PING_ACK, &connection->flags);
844         request_ping(connection);
845         wait_event(connection->ping_wait,
846                    test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
847 }
848
849 int drbd_resync_finished(struct drbd_device *device)
850 {
851         struct drbd_connection *connection = first_peer_device(device)->connection;
852         unsigned long db, dt, dbdt;
853         unsigned long n_oos;
854         union drbd_state os, ns;
855         struct drbd_device_work *dw;
856         char *khelper_cmd = NULL;
857         int verify_done = 0;
858
859         /* Remove all elements from the resync LRU. Since future actions
860          * might set bits in the (main) bitmap, then the entries in the
861          * resync LRU would be wrong. */
862         if (drbd_rs_del_all(device)) {
863                 /* In case this is not possible now, most probably because
864                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
865                  * queue (or even the read operations for those packets
866                  * is not finished by now).   Retry in 100ms. */
867
868                 schedule_timeout_interruptible(HZ / 10);
869                 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
870                 if (dw) {
871                         dw->w.cb = w_resync_finished;
872                         dw->device = device;
873                         drbd_queue_work(&connection->sender_work, &dw->w);
874                         return 1;
875                 }
876                 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
877         }
878
879         dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
880         if (dt <= 0)
881                 dt = 1;
882
883         db = device->rs_total;
884         /* adjust for verify start and stop sectors, respective reached position */
885         if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
886                 db -= device->ov_left;
887
888         dbdt = Bit2KB(db/dt);
889         device->rs_paused /= HZ;
890
891         if (!get_ldev(device))
892                 goto out;
893
894         ping_peer(device);
895
896         spin_lock_irq(&device->resource->req_lock);
897         os = drbd_read_state(device);
898
899         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
900
901         /* This protects us against multiple calls (that can happen in the presence
902            of application IO), and against connectivity loss just before we arrive here. */
903         if (os.conn <= C_CONNECTED)
904                 goto out_unlock;
905
906         ns = os;
907         ns.conn = C_CONNECTED;
908
909         drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
910              verify_done ? "Online verify" : "Resync",
911              dt + device->rs_paused, device->rs_paused, dbdt);
912
913         n_oos = drbd_bm_total_weight(device);
914
915         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
916                 if (n_oos) {
917                         drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
918                               n_oos, Bit2KB(1));
919                         khelper_cmd = "out-of-sync";
920                 }
921         } else {
922                 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
923
924                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
925                         khelper_cmd = "after-resync-target";
926
927                 if (device->use_csums && device->rs_total) {
928                         const unsigned long s = device->rs_same_csum;
929                         const unsigned long t = device->rs_total;
930                         const int ratio =
931                                 (t == 0)     ? 0 :
932                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
933                         drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
934                              "transferred %luK total %luK\n",
935                              ratio,
936                              Bit2KB(device->rs_same_csum),
937                              Bit2KB(device->rs_total - device->rs_same_csum),
938                              Bit2KB(device->rs_total));
939                 }
940         }
941
942         if (device->rs_failed) {
943                 drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
944
945                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
946                         ns.disk = D_INCONSISTENT;
947                         ns.pdsk = D_UP_TO_DATE;
948                 } else {
949                         ns.disk = D_UP_TO_DATE;
950                         ns.pdsk = D_INCONSISTENT;
951                 }
952         } else {
953                 ns.disk = D_UP_TO_DATE;
954                 ns.pdsk = D_UP_TO_DATE;
955
956                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
957                         if (device->p_uuid) {
958                                 int i;
959                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
960                                         _drbd_uuid_set(device, i, device->p_uuid[i]);
961                                 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
962                                 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
963                         } else {
964                                 drbd_err(device, "device->p_uuid is NULL! BUG\n");
965                         }
966                 }
967
968                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
969                         /* for verify runs, we don't update uuids here,
970                          * so there would be nothing to report. */
971                         drbd_uuid_set_bm(device, 0UL);
972                         drbd_print_uuids(device, "updated UUIDs");
973                         if (device->p_uuid) {
974                                 /* Now the two UUID sets are equal, update what we
975                                  * know of the peer. */
976                                 int i;
977                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
978                                         device->p_uuid[i] = device->ldev->md.uuid[i];
979                         }
980                 }
981         }
982
983         _drbd_set_state(device, ns, CS_VERBOSE, NULL);
984 out_unlock:
985         spin_unlock_irq(&device->resource->req_lock);
986
987         /* If we have been sync source, and have an effective fencing-policy,
988          * once *all* volumes are back in sync, call "unfence". */
989         if (os.conn == C_SYNC_SOURCE) {
990                 enum drbd_disk_state disk_state = D_MASK;
991                 enum drbd_disk_state pdsk_state = D_MASK;
992                 enum drbd_fencing_p fp = FP_DONT_CARE;
993
994                 rcu_read_lock();
995                 fp = rcu_dereference(device->ldev->disk_conf)->fencing;
996                 if (fp != FP_DONT_CARE) {
997                         struct drbd_peer_device *peer_device;
998                         int vnr;
999                         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1000                                 struct drbd_device *device = peer_device->device;
1001                                 disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1002                                 pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1003                         }
1004                 }
1005                 rcu_read_unlock();
1006                 if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1007                         conn_khelper(connection, "unfence-peer");
1008         }
1009
1010         put_ldev(device);
1011 out:
1012         device->rs_total  = 0;
1013         device->rs_failed = 0;
1014         device->rs_paused = 0;
1015
1016         /* reset start sector, if we reached end of device */
1017         if (verify_done && device->ov_left == 0)
1018                 device->ov_start_sector = 0;
1019
1020         drbd_md_sync(device);
1021
1022         if (khelper_cmd)
1023                 drbd_khelper(device, khelper_cmd);
1024
1025         return 1;
1026 }
1027
1028 /* helper */
1029 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1030 {
1031         if (drbd_peer_req_has_active_page(peer_req)) {
1032                 /* This might happen if sendpage() has not finished */
1033                 int i = PFN_UP(peer_req->i.size);
1034                 atomic_add(i, &device->pp_in_use_by_net);
1035                 atomic_sub(i, &device->pp_in_use);
1036                 spin_lock_irq(&device->resource->req_lock);
1037                 list_add_tail(&peer_req->w.list, &device->net_ee);
1038                 spin_unlock_irq(&device->resource->req_lock);
1039                 wake_up(&drbd_pp_wait);
1040         } else
1041                 drbd_free_peer_req(device, peer_req);
1042 }
1043
1044 /**
1045  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1046  * @w:          work object.
1047  * @cancel:     The connection will be closed anyways
1048  */
1049 int w_e_end_data_req(struct drbd_work *w, int cancel)
1050 {
1051         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1052         struct drbd_peer_device *peer_device = peer_req->peer_device;
1053         struct drbd_device *device = peer_device->device;
1054         int err;
1055
1056         if (unlikely(cancel)) {
1057                 drbd_free_peer_req(device, peer_req);
1058                 dec_unacked(device);
1059                 return 0;
1060         }
1061
1062         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1063                 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1064         } else {
1065                 if (__ratelimit(&drbd_ratelimit_state))
1066                         drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1067                             (unsigned long long)peer_req->i.sector);
1068
1069                 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1070         }
1071
1072         dec_unacked(device);
1073
1074         move_to_net_ee_or_free(device, peer_req);
1075
1076         if (unlikely(err))
1077                 drbd_err(device, "drbd_send_block() failed\n");
1078         return err;
1079 }
1080
1081 static bool all_zero(struct drbd_peer_request *peer_req)
1082 {
1083         struct page *page = peer_req->pages;
1084         unsigned int len = peer_req->i.size;
1085
1086         page_chain_for_each(page) {
1087                 unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1088                 unsigned int i, words = l / sizeof(long);
1089                 unsigned long *d;
1090
1091                 d = kmap_atomic(page);
1092                 for (i = 0; i < words; i++) {
1093                         if (d[i]) {
1094                                 kunmap_atomic(d);
1095                                 return false;
1096                         }
1097                 }
1098                 kunmap_atomic(d);
1099                 len -= l;
1100         }
1101
1102         return true;
1103 }
1104
1105 /**
1106  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1107  * @w:          work object.
1108  * @cancel:     The connection will be closed anyways
1109  */
1110 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1111 {
1112         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1113         struct drbd_peer_device *peer_device = peer_req->peer_device;
1114         struct drbd_device *device = peer_device->device;
1115         int err;
1116
1117         if (unlikely(cancel)) {
1118                 drbd_free_peer_req(device, peer_req);
1119                 dec_unacked(device);
1120                 return 0;
1121         }
1122
1123         if (get_ldev_if_state(device, D_FAILED)) {
1124                 drbd_rs_complete_io(device, peer_req->i.sector);
1125                 put_ldev(device);
1126         }
1127
1128         if (device->state.conn == C_AHEAD) {
1129                 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1130         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1131                 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1132                         inc_rs_pending(device);
1133                         if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1134                                 err = drbd_send_rs_deallocated(peer_device, peer_req);
1135                         else
1136                                 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1137                 } else {
1138                         if (__ratelimit(&drbd_ratelimit_state))
1139                                 drbd_err(device, "Not sending RSDataReply, "
1140                                     "partner DISKLESS!\n");
1141                         err = 0;
1142                 }
1143         } else {
1144                 if (__ratelimit(&drbd_ratelimit_state))
1145                         drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1146                             (unsigned long long)peer_req->i.sector);
1147
1148                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1149
1150                 /* update resync data with failure */
1151                 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1152         }
1153
1154         dec_unacked(device);
1155
1156         move_to_net_ee_or_free(device, peer_req);
1157
1158         if (unlikely(err))
1159                 drbd_err(device, "drbd_send_block() failed\n");
1160         return err;
1161 }
1162
1163 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1164 {
1165         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1166         struct drbd_peer_device *peer_device = peer_req->peer_device;
1167         struct drbd_device *device = peer_device->device;
1168         struct digest_info *di;
1169         int digest_size;
1170         void *digest = NULL;
1171         int err, eq = 0;
1172
1173         if (unlikely(cancel)) {
1174                 drbd_free_peer_req(device, peer_req);
1175                 dec_unacked(device);
1176                 return 0;
1177         }
1178
1179         if (get_ldev(device)) {
1180                 drbd_rs_complete_io(device, peer_req->i.sector);
1181                 put_ldev(device);
1182         }
1183
1184         di = peer_req->digest;
1185
1186         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1187                 /* quick hack to try to avoid a race against reconfiguration.
1188                  * a real fix would be much more involved,
1189                  * introducing more locking mechanisms */
1190                 if (peer_device->connection->csums_tfm) {
1191                         digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
1192                         D_ASSERT(device, digest_size == di->digest_size);
1193                         digest = kmalloc(digest_size, GFP_NOIO);
1194                 }
1195                 if (digest) {
1196                         drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1197                         eq = !memcmp(digest, di->digest, digest_size);
1198                         kfree(digest);
1199                 }
1200
1201                 if (eq) {
1202                         drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1203                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1204                         device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1205                         err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1206                 } else {
1207                         inc_rs_pending(device);
1208                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1209                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1210                         kfree(di);
1211                         err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1212                 }
1213         } else {
1214                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1215                 if (__ratelimit(&drbd_ratelimit_state))
1216                         drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1217         }
1218
1219         dec_unacked(device);
1220         move_to_net_ee_or_free(device, peer_req);
1221
1222         if (unlikely(err))
1223                 drbd_err(device, "drbd_send_block/ack() failed\n");
1224         return err;
1225 }
1226
1227 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1228 {
1229         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1230         struct drbd_peer_device *peer_device = peer_req->peer_device;
1231         struct drbd_device *device = peer_device->device;
1232         sector_t sector = peer_req->i.sector;
1233         unsigned int size = peer_req->i.size;
1234         int digest_size;
1235         void *digest;
1236         int err = 0;
1237
1238         if (unlikely(cancel))
1239                 goto out;
1240
1241         digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1242         digest = kmalloc(digest_size, GFP_NOIO);
1243         if (!digest) {
1244                 err = 1;        /* terminate the connection in case the allocation failed */
1245                 goto out;
1246         }
1247
1248         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1249                 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1250         else
1251                 memset(digest, 0, digest_size);
1252
1253         /* Free e and pages before send.
1254          * In case we block on congestion, we could otherwise run into
1255          * some distributed deadlock, if the other side blocks on
1256          * congestion as well, because our receiver blocks in
1257          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1258         drbd_free_peer_req(device, peer_req);
1259         peer_req = NULL;
1260         inc_rs_pending(device);
1261         err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1262         if (err)
1263                 dec_rs_pending(device);
1264         kfree(digest);
1265
1266 out:
1267         if (peer_req)
1268                 drbd_free_peer_req(device, peer_req);
1269         dec_unacked(device);
1270         return err;
1271 }
1272
1273 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1274 {
1275         if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1276                 device->ov_last_oos_size += size>>9;
1277         } else {
1278                 device->ov_last_oos_start = sector;
1279                 device->ov_last_oos_size = size>>9;
1280         }
1281         drbd_set_out_of_sync(device, sector, size);
1282 }
1283
1284 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1285 {
1286         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1287         struct drbd_peer_device *peer_device = peer_req->peer_device;
1288         struct drbd_device *device = peer_device->device;
1289         struct digest_info *di;
1290         void *digest;
1291         sector_t sector = peer_req->i.sector;
1292         unsigned int size = peer_req->i.size;
1293         int digest_size;
1294         int err, eq = 0;
1295         bool stop_sector_reached = false;
1296
1297         if (unlikely(cancel)) {
1298                 drbd_free_peer_req(device, peer_req);
1299                 dec_unacked(device);
1300                 return 0;
1301         }
1302
1303         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1304          * the resync lru has been cleaned up already */
1305         if (get_ldev(device)) {
1306                 drbd_rs_complete_io(device, peer_req->i.sector);
1307                 put_ldev(device);
1308         }
1309
1310         di = peer_req->digest;
1311
1312         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1313                 digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1314                 digest = kmalloc(digest_size, GFP_NOIO);
1315                 if (digest) {
1316                         drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1317
1318                         D_ASSERT(device, digest_size == di->digest_size);
1319                         eq = !memcmp(digest, di->digest, digest_size);
1320                         kfree(digest);
1321                 }
1322         }
1323
1324         /* Free peer_req and pages before send.
1325          * In case we block on congestion, we could otherwise run into
1326          * some distributed deadlock, if the other side blocks on
1327          * congestion as well, because our receiver blocks in
1328          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1329         drbd_free_peer_req(device, peer_req);
1330         if (!eq)
1331                 drbd_ov_out_of_sync_found(device, sector, size);
1332         else
1333                 ov_out_of_sync_print(device);
1334
1335         err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1336                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1337
1338         dec_unacked(device);
1339
1340         --device->ov_left;
1341
1342         /* let's advance progress step marks only for every other megabyte */
1343         if ((device->ov_left & 0x200) == 0x200)
1344                 drbd_advance_rs_marks(device, device->ov_left);
1345
1346         stop_sector_reached = verify_can_do_stop_sector(device) &&
1347                 (sector + (size>>9)) >= device->ov_stop_sector;
1348
1349         if (device->ov_left == 0 || stop_sector_reached) {
1350                 ov_out_of_sync_print(device);
1351                 drbd_resync_finished(device);
1352         }
1353
1354         return err;
1355 }
1356
1357 /* FIXME
1358  * We need to track the number of pending barrier acks,
1359  * and to be able to wait for them.
1360  * See also comment in drbd_adm_attach before drbd_suspend_io.
1361  */
1362 static int drbd_send_barrier(struct drbd_connection *connection)
1363 {
1364         struct p_barrier *p;
1365         struct drbd_socket *sock;
1366
1367         sock = &connection->data;
1368         p = conn_prepare_command(connection, sock);
1369         if (!p)
1370                 return -EIO;
1371         p->barrier = connection->send.current_epoch_nr;
1372         p->pad = 0;
1373         connection->send.current_epoch_writes = 0;
1374         connection->send.last_sent_barrier_jif = jiffies;
1375
1376         return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1377 }
1378
1379 static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1380 {
1381         struct drbd_socket *sock = &pd->connection->data;
1382         if (!drbd_prepare_command(pd, sock))
1383                 return -EIO;
1384         return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1385 }
1386
1387 int w_send_write_hint(struct drbd_work *w, int cancel)
1388 {
1389         struct drbd_device *device =
1390                 container_of(w, struct drbd_device, unplug_work);
1391
1392         if (cancel)
1393                 return 0;
1394         return pd_send_unplug_remote(first_peer_device(device));
1395 }
1396
1397 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1398 {
1399         if (!connection->send.seen_any_write_yet) {
1400                 connection->send.seen_any_write_yet = true;
1401                 connection->send.current_epoch_nr = epoch;
1402                 connection->send.current_epoch_writes = 0;
1403                 connection->send.last_sent_barrier_jif = jiffies;
1404         }
1405 }
1406
1407 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1408 {
1409         /* re-init if first write on this connection */
1410         if (!connection->send.seen_any_write_yet)
1411                 return;
1412         if (connection->send.current_epoch_nr != epoch) {
1413                 if (connection->send.current_epoch_writes)
1414                         drbd_send_barrier(connection);
1415                 connection->send.current_epoch_nr = epoch;
1416         }
1417 }
1418
1419 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1420 {
1421         struct drbd_request *req = container_of(w, struct drbd_request, w);
1422         struct drbd_device *device = req->device;
1423         struct drbd_peer_device *const peer_device = first_peer_device(device);
1424         struct drbd_connection *const connection = peer_device->connection;
1425         int err;
1426
1427         if (unlikely(cancel)) {
1428                 req_mod(req, SEND_CANCELED);
1429                 return 0;
1430         }
1431         req->pre_send_jif = jiffies;
1432
1433         /* this time, no connection->send.current_epoch_writes++;
1434          * If it was sent, it was the closing barrier for the last
1435          * replicated epoch, before we went into AHEAD mode.
1436          * No more barriers will be sent, until we leave AHEAD mode again. */
1437         maybe_send_barrier(connection, req->epoch);
1438
1439         err = drbd_send_out_of_sync(peer_device, req);
1440         req_mod(req, OOS_HANDED_TO_NETWORK);
1441
1442         return err;
1443 }
1444
1445 /**
1446  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1447  * @w:          work object.
1448  * @cancel:     The connection will be closed anyways
1449  */
1450 int w_send_dblock(struct drbd_work *w, int cancel)
1451 {
1452         struct drbd_request *req = container_of(w, struct drbd_request, w);
1453         struct drbd_device *device = req->device;
1454         struct drbd_peer_device *const peer_device = first_peer_device(device);
1455         struct drbd_connection *connection = peer_device->connection;
1456         bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1457         int err;
1458
1459         if (unlikely(cancel)) {
1460                 req_mod(req, SEND_CANCELED);
1461                 return 0;
1462         }
1463         req->pre_send_jif = jiffies;
1464
1465         re_init_if_first_write(connection, req->epoch);
1466         maybe_send_barrier(connection, req->epoch);
1467         connection->send.current_epoch_writes++;
1468
1469         err = drbd_send_dblock(peer_device, req);
1470         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1471
1472         if (do_send_unplug && !err)
1473                 pd_send_unplug_remote(peer_device);
1474
1475         return err;
1476 }
1477
1478 /**
1479  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1480  * @w:          work object.
1481  * @cancel:     The connection will be closed anyways
1482  */
1483 int w_send_read_req(struct drbd_work *w, int cancel)
1484 {
1485         struct drbd_request *req = container_of(w, struct drbd_request, w);
1486         struct drbd_device *device = req->device;
1487         struct drbd_peer_device *const peer_device = first_peer_device(device);
1488         struct drbd_connection *connection = peer_device->connection;
1489         bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1490         int err;
1491
1492         if (unlikely(cancel)) {
1493                 req_mod(req, SEND_CANCELED);
1494                 return 0;
1495         }
1496         req->pre_send_jif = jiffies;
1497
1498         /* Even read requests may close a write epoch,
1499          * if there was any yet. */
1500         maybe_send_barrier(connection, req->epoch);
1501
1502         err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1503                                  (unsigned long)req);
1504
1505         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1506
1507         if (do_send_unplug && !err)
1508                 pd_send_unplug_remote(peer_device);
1509
1510         return err;
1511 }
1512
1513 int w_restart_disk_io(struct drbd_work *w, int cancel)
1514 {
1515         struct drbd_request *req = container_of(w, struct drbd_request, w);
1516         struct drbd_device *device = req->device;
1517
1518         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1519                 drbd_al_begin_io(device, &req->i);
1520
1521         req->private_bio = bio_alloc_clone(device->ldev->backing_bdev,
1522                                            req->master_bio, GFP_NOIO,
1523                                           &drbd_io_bio_set);
1524         req->private_bio->bi_private = req;
1525         req->private_bio->bi_end_io = drbd_request_endio;
1526         submit_bio_noacct(req->private_bio);
1527
1528         return 0;
1529 }
1530
1531 static int _drbd_may_sync_now(struct drbd_device *device)
1532 {
1533         struct drbd_device *odev = device;
1534         int resync_after;
1535
1536         while (1) {
1537                 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1538                         return 1;
1539                 rcu_read_lock();
1540                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1541                 rcu_read_unlock();
1542                 if (resync_after == -1)
1543                         return 1;
1544                 odev = minor_to_device(resync_after);
1545                 if (!odev)
1546                         return 1;
1547                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1548                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1549                     odev->state.aftr_isp || odev->state.peer_isp ||
1550                     odev->state.user_isp)
1551                         return 0;
1552         }
1553 }
1554
1555 /**
1556  * drbd_pause_after() - Pause resync on all devices that may not resync now
1557  * @device:     DRBD device.
1558  *
1559  * Called from process context only (admin command and after_state_ch).
1560  */
1561 static bool drbd_pause_after(struct drbd_device *device)
1562 {
1563         bool changed = false;
1564         struct drbd_device *odev;
1565         int i;
1566
1567         rcu_read_lock();
1568         idr_for_each_entry(&drbd_devices, odev, i) {
1569                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1570                         continue;
1571                 if (!_drbd_may_sync_now(odev) &&
1572                     _drbd_set_state(_NS(odev, aftr_isp, 1),
1573                                     CS_HARD, NULL) != SS_NOTHING_TO_DO)
1574                         changed = true;
1575         }
1576         rcu_read_unlock();
1577
1578         return changed;
1579 }
1580
1581 /**
1582  * drbd_resume_next() - Resume resync on all devices that may resync now
1583  * @device:     DRBD device.
1584  *
1585  * Called from process context only (admin command and worker).
1586  */
1587 static bool drbd_resume_next(struct drbd_device *device)
1588 {
1589         bool changed = false;
1590         struct drbd_device *odev;
1591         int i;
1592
1593         rcu_read_lock();
1594         idr_for_each_entry(&drbd_devices, odev, i) {
1595                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1596                         continue;
1597                 if (odev->state.aftr_isp) {
1598                         if (_drbd_may_sync_now(odev) &&
1599                             _drbd_set_state(_NS(odev, aftr_isp, 0),
1600                                             CS_HARD, NULL) != SS_NOTHING_TO_DO)
1601                                 changed = true;
1602                 }
1603         }
1604         rcu_read_unlock();
1605         return changed;
1606 }
1607
1608 void resume_next_sg(struct drbd_device *device)
1609 {
1610         lock_all_resources();
1611         drbd_resume_next(device);
1612         unlock_all_resources();
1613 }
1614
1615 void suspend_other_sg(struct drbd_device *device)
1616 {
1617         lock_all_resources();
1618         drbd_pause_after(device);
1619         unlock_all_resources();
1620 }
1621
1622 /* caller must lock_all_resources() */
1623 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1624 {
1625         struct drbd_device *odev;
1626         int resync_after;
1627
1628         if (o_minor == -1)
1629                 return NO_ERROR;
1630         if (o_minor < -1 || o_minor > MINORMASK)
1631                 return ERR_RESYNC_AFTER;
1632
1633         /* check for loops */
1634         odev = minor_to_device(o_minor);
1635         while (1) {
1636                 if (odev == device)
1637                         return ERR_RESYNC_AFTER_CYCLE;
1638
1639                 /* You are free to depend on diskless, non-existing,
1640                  * or not yet/no longer existing minors.
1641                  * We only reject dependency loops.
1642                  * We cannot follow the dependency chain beyond a detached or
1643                  * missing minor.
1644                  */
1645                 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1646                         return NO_ERROR;
1647
1648                 rcu_read_lock();
1649                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1650                 rcu_read_unlock();
1651                 /* dependency chain ends here, no cycles. */
1652                 if (resync_after == -1)
1653                         return NO_ERROR;
1654
1655                 /* follow the dependency chain */
1656                 odev = minor_to_device(resync_after);
1657         }
1658 }
1659
1660 /* caller must lock_all_resources() */
1661 void drbd_resync_after_changed(struct drbd_device *device)
1662 {
1663         int changed;
1664
1665         do {
1666                 changed  = drbd_pause_after(device);
1667                 changed |= drbd_resume_next(device);
1668         } while (changed);
1669 }
1670
1671 void drbd_rs_controller_reset(struct drbd_device *device)
1672 {
1673         struct gendisk *disk = device->ldev->backing_bdev->bd_disk;
1674         struct fifo_buffer *plan;
1675
1676         atomic_set(&device->rs_sect_in, 0);
1677         atomic_set(&device->rs_sect_ev, 0);
1678         device->rs_in_flight = 0;
1679         device->rs_last_events =
1680                 (int)part_stat_read_accum(disk->part0, sectors);
1681
1682         /* Updating the RCU protected object in place is necessary since
1683            this function gets called from atomic context.
1684            It is valid since all other updates also lead to an completely
1685            empty fifo */
1686         rcu_read_lock();
1687         plan = rcu_dereference(device->rs_plan_s);
1688         plan->total = 0;
1689         fifo_set(plan, 0);
1690         rcu_read_unlock();
1691 }
1692
1693 void start_resync_timer_fn(struct timer_list *t)
1694 {
1695         struct drbd_device *device = from_timer(device, t, start_resync_timer);
1696         drbd_device_post_work(device, RS_START);
1697 }
1698
1699 static void do_start_resync(struct drbd_device *device)
1700 {
1701         if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1702                 drbd_warn(device, "postponing start_resync ...\n");
1703                 device->start_resync_timer.expires = jiffies + HZ/10;
1704                 add_timer(&device->start_resync_timer);
1705                 return;
1706         }
1707
1708         drbd_start_resync(device, C_SYNC_SOURCE);
1709         clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1710 }
1711
1712 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1713 {
1714         bool csums_after_crash_only;
1715         rcu_read_lock();
1716         csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1717         rcu_read_unlock();
1718         return connection->agreed_pro_version >= 89 &&          /* supported? */
1719                 connection->csums_tfm &&                        /* configured? */
1720                 (csums_after_crash_only == false                /* use for each resync? */
1721                  || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1722 }
1723
1724 /**
1725  * drbd_start_resync() - Start the resync process
1726  * @device:     DRBD device.
1727  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1728  *
1729  * This function might bring you directly into one of the
1730  * C_PAUSED_SYNC_* states.
1731  */
1732 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1733 {
1734         struct drbd_peer_device *peer_device = first_peer_device(device);
1735         struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1736         union drbd_state ns;
1737         int r;
1738
1739         if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1740                 drbd_err(device, "Resync already running!\n");
1741                 return;
1742         }
1743
1744         if (!connection) {
1745                 drbd_err(device, "No connection to peer, aborting!\n");
1746                 return;
1747         }
1748
1749         if (!test_bit(B_RS_H_DONE, &device->flags)) {
1750                 if (side == C_SYNC_TARGET) {
1751                         /* Since application IO was locked out during C_WF_BITMAP_T and
1752                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1753                            we check that we might make the data inconsistent. */
1754                         r = drbd_khelper(device, "before-resync-target");
1755                         r = (r >> 8) & 0xff;
1756                         if (r > 0) {
1757                                 drbd_info(device, "before-resync-target handler returned %d, "
1758                                          "dropping connection.\n", r);
1759                                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1760                                 return;
1761                         }
1762                 } else /* C_SYNC_SOURCE */ {
1763                         r = drbd_khelper(device, "before-resync-source");
1764                         r = (r >> 8) & 0xff;
1765                         if (r > 0) {
1766                                 if (r == 3) {
1767                                         drbd_info(device, "before-resync-source handler returned %d, "
1768                                                  "ignoring. Old userland tools?", r);
1769                                 } else {
1770                                         drbd_info(device, "before-resync-source handler returned %d, "
1771                                                  "dropping connection.\n", r);
1772                                         conn_request_state(connection,
1773                                                            NS(conn, C_DISCONNECTING), CS_HARD);
1774                                         return;
1775                                 }
1776                         }
1777                 }
1778         }
1779
1780         if (current == connection->worker.task) {
1781                 /* The worker should not sleep waiting for state_mutex,
1782                    that can take long */
1783                 if (!mutex_trylock(device->state_mutex)) {
1784                         set_bit(B_RS_H_DONE, &device->flags);
1785                         device->start_resync_timer.expires = jiffies + HZ/5;
1786                         add_timer(&device->start_resync_timer);
1787                         return;
1788                 }
1789         } else {
1790                 mutex_lock(device->state_mutex);
1791         }
1792
1793         lock_all_resources();
1794         clear_bit(B_RS_H_DONE, &device->flags);
1795         /* Did some connection breakage or IO error race with us? */
1796         if (device->state.conn < C_CONNECTED
1797         || !get_ldev_if_state(device, D_NEGOTIATING)) {
1798                 unlock_all_resources();
1799                 goto out;
1800         }
1801
1802         ns = drbd_read_state(device);
1803
1804         ns.aftr_isp = !_drbd_may_sync_now(device);
1805
1806         ns.conn = side;
1807
1808         if (side == C_SYNC_TARGET)
1809                 ns.disk = D_INCONSISTENT;
1810         else /* side == C_SYNC_SOURCE */
1811                 ns.pdsk = D_INCONSISTENT;
1812
1813         r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1814         ns = drbd_read_state(device);
1815
1816         if (ns.conn < C_CONNECTED)
1817                 r = SS_UNKNOWN_ERROR;
1818
1819         if (r == SS_SUCCESS) {
1820                 unsigned long tw = drbd_bm_total_weight(device);
1821                 unsigned long now = jiffies;
1822                 int i;
1823
1824                 device->rs_failed    = 0;
1825                 device->rs_paused    = 0;
1826                 device->rs_same_csum = 0;
1827                 device->rs_last_sect_ev = 0;
1828                 device->rs_total     = tw;
1829                 device->rs_start     = now;
1830                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1831                         device->rs_mark_left[i] = tw;
1832                         device->rs_mark_time[i] = now;
1833                 }
1834                 drbd_pause_after(device);
1835                 /* Forget potentially stale cached per resync extent bit-counts.
1836                  * Open coded drbd_rs_cancel_all(device), we already have IRQs
1837                  * disabled, and know the disk state is ok. */
1838                 spin_lock(&device->al_lock);
1839                 lc_reset(device->resync);
1840                 device->resync_locked = 0;
1841                 device->resync_wenr = LC_FREE;
1842                 spin_unlock(&device->al_lock);
1843         }
1844         unlock_all_resources();
1845
1846         if (r == SS_SUCCESS) {
1847                 wake_up(&device->al_wait); /* for lc_reset() above */
1848                 /* reset rs_last_bcast when a resync or verify is started,
1849                  * to deal with potential jiffies wrap. */
1850                 device->rs_last_bcast = jiffies - HZ;
1851
1852                 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1853                      drbd_conn_str(ns.conn),
1854                      (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1855                      (unsigned long) device->rs_total);
1856                 if (side == C_SYNC_TARGET) {
1857                         device->bm_resync_fo = 0;
1858                         device->use_csums = use_checksum_based_resync(connection, device);
1859                 } else {
1860                         device->use_csums = false;
1861                 }
1862
1863                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1864                  * with w_send_oos, or the sync target will get confused as to
1865                  * how much bits to resync.  We cannot do that always, because for an
1866                  * empty resync and protocol < 95, we need to do it here, as we call
1867                  * drbd_resync_finished from here in that case.
1868                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1869                  * and from after_state_ch otherwise. */
1870                 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1871                         drbd_gen_and_send_sync_uuid(peer_device);
1872
1873                 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1874                         /* This still has a race (about when exactly the peers
1875                          * detect connection loss) that can lead to a full sync
1876                          * on next handshake. In 8.3.9 we fixed this with explicit
1877                          * resync-finished notifications, but the fix
1878                          * introduces a protocol change.  Sleeping for some
1879                          * time longer than the ping interval + timeout on the
1880                          * SyncSource, to give the SyncTarget the chance to
1881                          * detect connection loss, then waiting for a ping
1882                          * response (implicit in drbd_resync_finished) reduces
1883                          * the race considerably, but does not solve it. */
1884                         if (side == C_SYNC_SOURCE) {
1885                                 struct net_conf *nc;
1886                                 int timeo;
1887
1888                                 rcu_read_lock();
1889                                 nc = rcu_dereference(connection->net_conf);
1890                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1891                                 rcu_read_unlock();
1892                                 schedule_timeout_interruptible(timeo);
1893                         }
1894                         drbd_resync_finished(device);
1895                 }
1896
1897                 drbd_rs_controller_reset(device);
1898                 /* ns.conn may already be != device->state.conn,
1899                  * we may have been paused in between, or become paused until
1900                  * the timer triggers.
1901                  * No matter, that is handled in resync_timer_fn() */
1902                 if (ns.conn == C_SYNC_TARGET)
1903                         mod_timer(&device->resync_timer, jiffies);
1904
1905                 drbd_md_sync(device);
1906         }
1907         put_ldev(device);
1908 out:
1909         mutex_unlock(device->state_mutex);
1910 }
1911
1912 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1913 {
1914         struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1915         device->rs_last_bcast = jiffies;
1916
1917         if (!get_ldev(device))
1918                 return;
1919
1920         drbd_bm_write_lazy(device, 0);
1921         if (resync_done && is_sync_state(device->state.conn))
1922                 drbd_resync_finished(device);
1923
1924         drbd_bcast_event(device, &sib);
1925         /* update timestamp, in case it took a while to write out stuff */
1926         device->rs_last_bcast = jiffies;
1927         put_ldev(device);
1928 }
1929
1930 static void drbd_ldev_destroy(struct drbd_device *device)
1931 {
1932         lc_destroy(device->resync);
1933         device->resync = NULL;
1934         lc_destroy(device->act_log);
1935         device->act_log = NULL;
1936
1937         __acquire(local);
1938         drbd_backing_dev_free(device, device->ldev);
1939         device->ldev = NULL;
1940         __release(local);
1941
1942         clear_bit(GOING_DISKLESS, &device->flags);
1943         wake_up(&device->misc_wait);
1944 }
1945
1946 static void go_diskless(struct drbd_device *device)
1947 {
1948         D_ASSERT(device, device->state.disk == D_FAILED);
1949         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1950          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1951          * the protected members anymore, though, so once put_ldev reaches zero
1952          * again, it will be safe to free them. */
1953
1954         /* Try to write changed bitmap pages, read errors may have just
1955          * set some bits outside the area covered by the activity log.
1956          *
1957          * If we have an IO error during the bitmap writeout,
1958          * we will want a full sync next time, just in case.
1959          * (Do we want a specific meta data flag for this?)
1960          *
1961          * If that does not make it to stable storage either,
1962          * we cannot do anything about that anymore.
1963          *
1964          * We still need to check if both bitmap and ldev are present, we may
1965          * end up here after a failed attach, before ldev was even assigned.
1966          */
1967         if (device->bitmap && device->ldev) {
1968                 /* An interrupted resync or similar is allowed to recounts bits
1969                  * while we detach.
1970                  * Any modifications would not be expected anymore, though.
1971                  */
1972                 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1973                                         "detach", BM_LOCKED_TEST_ALLOWED)) {
1974                         if (test_bit(WAS_READ_ERROR, &device->flags)) {
1975                                 drbd_md_set_flag(device, MDF_FULL_SYNC);
1976                                 drbd_md_sync(device);
1977                         }
1978                 }
1979         }
1980
1981         drbd_force_state(device, NS(disk, D_DISKLESS));
1982 }
1983
1984 static int do_md_sync(struct drbd_device *device)
1985 {
1986         drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1987         drbd_md_sync(device);
1988         return 0;
1989 }
1990
1991 /* only called from drbd_worker thread, no locking */
1992 void __update_timing_details(
1993                 struct drbd_thread_timing_details *tdp,
1994                 unsigned int *cb_nr,
1995                 void *cb,
1996                 const char *fn, const unsigned int line)
1997 {
1998         unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1999         struct drbd_thread_timing_details *td = tdp + i;
2000
2001         td->start_jif = jiffies;
2002         td->cb_addr = cb;
2003         td->caller_fn = fn;
2004         td->line = line;
2005         td->cb_nr = *cb_nr;
2006
2007         i = (i+1) % DRBD_THREAD_DETAILS_HIST;
2008         td = tdp + i;
2009         memset(td, 0, sizeof(*td));
2010
2011         ++(*cb_nr);
2012 }
2013
2014 static void do_device_work(struct drbd_device *device, const unsigned long todo)
2015 {
2016         if (test_bit(MD_SYNC, &todo))
2017                 do_md_sync(device);
2018         if (test_bit(RS_DONE, &todo) ||
2019             test_bit(RS_PROGRESS, &todo))
2020                 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2021         if (test_bit(GO_DISKLESS, &todo))
2022                 go_diskless(device);
2023         if (test_bit(DESTROY_DISK, &todo))
2024                 drbd_ldev_destroy(device);
2025         if (test_bit(RS_START, &todo))
2026                 do_start_resync(device);
2027 }
2028
2029 #define DRBD_DEVICE_WORK_MASK   \
2030         ((1UL << GO_DISKLESS)   \
2031         |(1UL << DESTROY_DISK)  \
2032         |(1UL << MD_SYNC)       \
2033         |(1UL << RS_START)      \
2034         |(1UL << RS_PROGRESS)   \
2035         |(1UL << RS_DONE)       \
2036         )
2037
2038 static unsigned long get_work_bits(unsigned long *flags)
2039 {
2040         unsigned long old, new;
2041         do {
2042                 old = *flags;
2043                 new = old & ~DRBD_DEVICE_WORK_MASK;
2044         } while (cmpxchg(flags, old, new) != old);
2045         return old & DRBD_DEVICE_WORK_MASK;
2046 }
2047
2048 static void do_unqueued_work(struct drbd_connection *connection)
2049 {
2050         struct drbd_peer_device *peer_device;
2051         int vnr;
2052
2053         rcu_read_lock();
2054         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2055                 struct drbd_device *device = peer_device->device;
2056                 unsigned long todo = get_work_bits(&device->flags);
2057                 if (!todo)
2058                         continue;
2059
2060                 kref_get(&device->kref);
2061                 rcu_read_unlock();
2062                 do_device_work(device, todo);
2063                 kref_put(&device->kref, drbd_destroy_device);
2064                 rcu_read_lock();
2065         }
2066         rcu_read_unlock();
2067 }
2068
2069 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2070 {
2071         spin_lock_irq(&queue->q_lock);
2072         list_splice_tail_init(&queue->q, work_list);
2073         spin_unlock_irq(&queue->q_lock);
2074         return !list_empty(work_list);
2075 }
2076
2077 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2078 {
2079         DEFINE_WAIT(wait);
2080         struct net_conf *nc;
2081         int uncork, cork;
2082
2083         dequeue_work_batch(&connection->sender_work, work_list);
2084         if (!list_empty(work_list))
2085                 return;
2086
2087         /* Still nothing to do?
2088          * Maybe we still need to close the current epoch,
2089          * even if no new requests are queued yet.
2090          *
2091          * Also, poke TCP, just in case.
2092          * Then wait for new work (or signal). */
2093         rcu_read_lock();
2094         nc = rcu_dereference(connection->net_conf);
2095         uncork = nc ? nc->tcp_cork : 0;
2096         rcu_read_unlock();
2097         if (uncork) {
2098                 mutex_lock(&connection->data.mutex);
2099                 if (connection->data.socket)
2100                         tcp_sock_set_cork(connection->data.socket->sk, false);
2101                 mutex_unlock(&connection->data.mutex);
2102         }
2103
2104         for (;;) {
2105                 int send_barrier;
2106                 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2107                 spin_lock_irq(&connection->resource->req_lock);
2108                 spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
2109                 if (!list_empty(&connection->sender_work.q))
2110                         list_splice_tail_init(&connection->sender_work.q, work_list);
2111                 spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
2112                 if (!list_empty(work_list) || signal_pending(current)) {
2113                         spin_unlock_irq(&connection->resource->req_lock);
2114                         break;
2115                 }
2116
2117                 /* We found nothing new to do, no to-be-communicated request,
2118                  * no other work item.  We may still need to close the last
2119                  * epoch.  Next incoming request epoch will be connection ->
2120                  * current transfer log epoch number.  If that is different
2121                  * from the epoch of the last request we communicated, it is
2122                  * safe to send the epoch separating barrier now.
2123                  */
2124                 send_barrier =
2125                         atomic_read(&connection->current_tle_nr) !=
2126                         connection->send.current_epoch_nr;
2127                 spin_unlock_irq(&connection->resource->req_lock);
2128
2129                 if (send_barrier)
2130                         maybe_send_barrier(connection,
2131                                         connection->send.current_epoch_nr + 1);
2132
2133                 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2134                         break;
2135
2136                 /* drbd_send() may have called flush_signals() */
2137                 if (get_t_state(&connection->worker) != RUNNING)
2138                         break;
2139
2140                 schedule();
2141                 /* may be woken up for other things but new work, too,
2142                  * e.g. if the current epoch got closed.
2143                  * In which case we send the barrier above. */
2144         }
2145         finish_wait(&connection->sender_work.q_wait, &wait);
2146
2147         /* someone may have changed the config while we have been waiting above. */
2148         rcu_read_lock();
2149         nc = rcu_dereference(connection->net_conf);
2150         cork = nc ? nc->tcp_cork : 0;
2151         rcu_read_unlock();
2152         mutex_lock(&connection->data.mutex);
2153         if (connection->data.socket) {
2154                 if (cork)
2155                         tcp_sock_set_cork(connection->data.socket->sk, true);
2156                 else if (!uncork)
2157                         tcp_sock_set_cork(connection->data.socket->sk, false);
2158         }
2159         mutex_unlock(&connection->data.mutex);
2160 }
2161
2162 int drbd_worker(struct drbd_thread *thi)
2163 {
2164         struct drbd_connection *connection = thi->connection;
2165         struct drbd_work *w = NULL;
2166         struct drbd_peer_device *peer_device;
2167         LIST_HEAD(work_list);
2168         int vnr;
2169
2170         while (get_t_state(thi) == RUNNING) {
2171                 drbd_thread_current_set_cpu(thi);
2172
2173                 if (list_empty(&work_list)) {
2174                         update_worker_timing_details(connection, wait_for_work);
2175                         wait_for_work(connection, &work_list);
2176                 }
2177
2178                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2179                         update_worker_timing_details(connection, do_unqueued_work);
2180                         do_unqueued_work(connection);
2181                 }
2182
2183                 if (signal_pending(current)) {
2184                         flush_signals(current);
2185                         if (get_t_state(thi) == RUNNING) {
2186                                 drbd_warn(connection, "Worker got an unexpected signal\n");
2187                                 continue;
2188                         }
2189                         break;
2190                 }
2191
2192                 if (get_t_state(thi) != RUNNING)
2193                         break;
2194
2195                 if (!list_empty(&work_list)) {
2196                         w = list_first_entry(&work_list, struct drbd_work, list);
2197                         list_del_init(&w->list);
2198                         update_worker_timing_details(connection, w->cb);
2199                         if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2200                                 continue;
2201                         if (connection->cstate >= C_WF_REPORT_PARAMS)
2202                                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2203                 }
2204         }
2205
2206         do {
2207                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2208                         update_worker_timing_details(connection, do_unqueued_work);
2209                         do_unqueued_work(connection);
2210                 }
2211                 if (!list_empty(&work_list)) {
2212                         w = list_first_entry(&work_list, struct drbd_work, list);
2213                         list_del_init(&w->list);
2214                         update_worker_timing_details(connection, w->cb);
2215                         w->cb(w, 1);
2216                 } else
2217                         dequeue_work_batch(&connection->sender_work, &work_list);
2218         } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2219
2220         rcu_read_lock();
2221         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2222                 struct drbd_device *device = peer_device->device;
2223                 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2224                 kref_get(&device->kref);
2225                 rcu_read_unlock();
2226                 drbd_device_cleanup(device);
2227                 kref_put(&device->kref, drbd_destroy_device);
2228                 rcu_read_lock();
2229         }
2230         rcu_read_unlock();
2231
2232         return 0;
2233 }