GNU Linux-libre 4.9.315-gnu1
[releases.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <linux/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_protocol.h"
48 #include "drbd_req.h"
49 #include "drbd_vli.h"
50
51 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME)
52
53 struct packet_info {
54         enum drbd_packet cmd;
55         unsigned int size;
56         unsigned int vnr;
57         void *data;
58 };
59
60 enum finish_epoch {
61         FE_STILL_LIVE,
62         FE_DESTROYED,
63         FE_RECYCLED,
64 };
65
66 static int drbd_do_features(struct drbd_connection *connection);
67 static int drbd_do_auth(struct drbd_connection *connection);
68 static int drbd_disconnected(struct drbd_peer_device *);
69 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
71 static int e_end_block(struct drbd_work *, int);
72
73
74 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
75
76 /*
77  * some helper functions to deal with single linked page lists,
78  * page->private being our "next" pointer.
79  */
80
81 /* If at least n pages are linked at head, get n pages off.
82  * Otherwise, don't modify head, and return NULL.
83  * Locking is the responsibility of the caller.
84  */
85 static struct page *page_chain_del(struct page **head, int n)
86 {
87         struct page *page;
88         struct page *tmp;
89
90         BUG_ON(!n);
91         BUG_ON(!head);
92
93         page = *head;
94
95         if (!page)
96                 return NULL;
97
98         while (page) {
99                 tmp = page_chain_next(page);
100                 if (--n == 0)
101                         break; /* found sufficient pages */
102                 if (tmp == NULL)
103                         /* insufficient pages, don't use any of them. */
104                         return NULL;
105                 page = tmp;
106         }
107
108         /* add end of list marker for the returned list */
109         set_page_private(page, 0);
110         /* actual return value, and adjustment of head */
111         page = *head;
112         *head = tmp;
113         return page;
114 }
115
116 /* may be used outside of locks to find the tail of a (usually short)
117  * "private" page chain, before adding it back to a global chain head
118  * with page_chain_add() under a spinlock. */
119 static struct page *page_chain_tail(struct page *page, int *len)
120 {
121         struct page *tmp;
122         int i = 1;
123         while ((tmp = page_chain_next(page)))
124                 ++i, page = tmp;
125         if (len)
126                 *len = i;
127         return page;
128 }
129
130 static int page_chain_free(struct page *page)
131 {
132         struct page *tmp;
133         int i = 0;
134         page_chain_for_each_safe(page, tmp) {
135                 put_page(page);
136                 ++i;
137         }
138         return i;
139 }
140
141 static void page_chain_add(struct page **head,
142                 struct page *chain_first, struct page *chain_last)
143 {
144 #if 1
145         struct page *tmp;
146         tmp = page_chain_tail(chain_first, NULL);
147         BUG_ON(tmp != chain_last);
148 #endif
149
150         /* add chain to head */
151         set_page_private(chain_last, (unsigned long)*head);
152         *head = chain_first;
153 }
154
155 static struct page *__drbd_alloc_pages(struct drbd_device *device,
156                                        unsigned int number)
157 {
158         struct page *page = NULL;
159         struct page *tmp = NULL;
160         unsigned int i = 0;
161
162         /* Yes, testing drbd_pp_vacant outside the lock is racy.
163          * So what. It saves a spin_lock. */
164         if (drbd_pp_vacant >= number) {
165                 spin_lock(&drbd_pp_lock);
166                 page = page_chain_del(&drbd_pp_pool, number);
167                 if (page)
168                         drbd_pp_vacant -= number;
169                 spin_unlock(&drbd_pp_lock);
170                 if (page)
171                         return page;
172         }
173
174         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
175          * "criss-cross" setup, that might cause write-out on some other DRBD,
176          * which in turn might block on the other node at this very place.  */
177         for (i = 0; i < number; i++) {
178                 tmp = alloc_page(GFP_TRY);
179                 if (!tmp)
180                         break;
181                 set_page_private(tmp, (unsigned long)page);
182                 page = tmp;
183         }
184
185         if (i == number)
186                 return page;
187
188         /* Not enough pages immediately available this time.
189          * No need to jump around here, drbd_alloc_pages will retry this
190          * function "soon". */
191         if (page) {
192                 tmp = page_chain_tail(page, NULL);
193                 spin_lock(&drbd_pp_lock);
194                 page_chain_add(&drbd_pp_pool, page, tmp);
195                 drbd_pp_vacant += i;
196                 spin_unlock(&drbd_pp_lock);
197         }
198         return NULL;
199 }
200
201 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
202                                            struct list_head *to_be_freed)
203 {
204         struct drbd_peer_request *peer_req, *tmp;
205
206         /* The EEs are always appended to the end of the list. Since
207            they are sent in order over the wire, they have to finish
208            in order. As soon as we see the first not finished we can
209            stop to examine the list... */
210
211         list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(&peer_req->w.list, to_be_freed);
215         }
216 }
217
218 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&device->resource->req_lock);
224         reclaim_finished_net_peer_reqs(device, &reclaimed);
225         spin_unlock_irq(&device->resource->req_lock);
226         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
227                 drbd_free_net_peer_req(device, peer_req);
228 }
229
230 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
231 {
232         struct drbd_peer_device *peer_device;
233         int vnr;
234
235         rcu_read_lock();
236         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
237                 struct drbd_device *device = peer_device->device;
238                 if (!atomic_read(&device->pp_in_use_by_net))
239                         continue;
240
241                 kref_get(&device->kref);
242                 rcu_read_unlock();
243                 drbd_reclaim_net_peer_reqs(device);
244                 kref_put(&device->kref, drbd_destroy_device);
245                 rcu_read_lock();
246         }
247         rcu_read_unlock();
248 }
249
250 /**
251  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
252  * @device:     DRBD device.
253  * @number:     number of pages requested
254  * @retry:      whether to retry, if not enough pages are available right now
255  *
256  * Tries to allocate number pages, first from our own page pool, then from
257  * the kernel.
258  * Possibly retry until DRBD frees sufficient pages somewhere else.
259  *
260  * If this allocation would exceed the max_buffers setting, we throttle
261  * allocation (schedule_timeout) to give the system some room to breathe.
262  *
263  * We do not use max-buffers as hard limit, because it could lead to
264  * congestion and further to a distributed deadlock during online-verify or
265  * (checksum based) resync, if the max-buffers, socket buffer sizes and
266  * resync-rate settings are mis-configured.
267  *
268  * Returns a page chain linked via page->private.
269  */
270 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
271                               bool retry)
272 {
273         struct drbd_device *device = peer_device->device;
274         struct page *page = NULL;
275         struct net_conf *nc;
276         DEFINE_WAIT(wait);
277         unsigned int mxb;
278
279         rcu_read_lock();
280         nc = rcu_dereference(peer_device->connection->net_conf);
281         mxb = nc ? nc->max_buffers : 1000000;
282         rcu_read_unlock();
283
284         if (atomic_read(&device->pp_in_use) < mxb)
285                 page = __drbd_alloc_pages(device, number);
286
287         /* Try to keep the fast path fast, but occasionally we need
288          * to reclaim the pages we lended to the network stack. */
289         if (page && atomic_read(&device->pp_in_use_by_net) > 512)
290                 drbd_reclaim_net_peer_reqs(device);
291
292         while (page == NULL) {
293                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
294
295                 drbd_reclaim_net_peer_reqs(device);
296
297                 if (atomic_read(&device->pp_in_use) < mxb) {
298                         page = __drbd_alloc_pages(device, number);
299                         if (page)
300                                 break;
301                 }
302
303                 if (!retry)
304                         break;
305
306                 if (signal_pending(current)) {
307                         drbd_warn(device, "drbd_alloc_pages interrupted!\n");
308                         break;
309                 }
310
311                 if (schedule_timeout(HZ/10) == 0)
312                         mxb = UINT_MAX;
313         }
314         finish_wait(&drbd_pp_wait, &wait);
315
316         if (page)
317                 atomic_add(number, &device->pp_in_use);
318         return page;
319 }
320
321 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
322  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
323  * Either links the page chain back to the global pool,
324  * or returns all pages to the system. */
325 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
326 {
327         atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
328         int i;
329
330         if (page == NULL)
331                 return;
332
333         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
334                 i = page_chain_free(page);
335         else {
336                 struct page *tmp;
337                 tmp = page_chain_tail(page, &i);
338                 spin_lock(&drbd_pp_lock);
339                 page_chain_add(&drbd_pp_pool, page, tmp);
340                 drbd_pp_vacant += i;
341                 spin_unlock(&drbd_pp_lock);
342         }
343         i = atomic_sub_return(i, a);
344         if (i < 0)
345                 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
346                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
347         wake_up(&drbd_pp_wait);
348 }
349
350 /*
351 You need to hold the req_lock:
352  _drbd_wait_ee_list_empty()
353
354 You must not have the req_lock:
355  drbd_free_peer_req()
356  drbd_alloc_peer_req()
357  drbd_free_peer_reqs()
358  drbd_ee_fix_bhs()
359  drbd_finish_peer_reqs()
360  drbd_clear_done_ee()
361  drbd_wait_ee_list_empty()
362 */
363
364 /* normal: payload_size == request size (bi_size)
365  * w_same: payload_size == logical_block_size
366  * trim: payload_size == 0 */
367 struct drbd_peer_request *
368 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
369                     unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
370 {
371         struct drbd_device *device = peer_device->device;
372         struct drbd_peer_request *peer_req;
373         struct page *page = NULL;
374         unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT;
375
376         if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
377                 return NULL;
378
379         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
380         if (!peer_req) {
381                 if (!(gfp_mask & __GFP_NOWARN))
382                         drbd_err(device, "%s: allocation failed\n", __func__);
383                 return NULL;
384         }
385
386         if (nr_pages) {
387                 page = drbd_alloc_pages(peer_device, nr_pages,
388                                         gfpflags_allow_blocking(gfp_mask));
389                 if (!page)
390                         goto fail;
391         }
392
393         memset(peer_req, 0, sizeof(*peer_req));
394         INIT_LIST_HEAD(&peer_req->w.list);
395         drbd_clear_interval(&peer_req->i);
396         peer_req->i.size = request_size;
397         peer_req->i.sector = sector;
398         peer_req->submit_jif = jiffies;
399         peer_req->peer_device = peer_device;
400         peer_req->pages = page;
401         /*
402          * The block_id is opaque to the receiver.  It is not endianness
403          * converted, and sent back to the sender unchanged.
404          */
405         peer_req->block_id = id;
406
407         return peer_req;
408
409  fail:
410         mempool_free(peer_req, drbd_ee_mempool);
411         return NULL;
412 }
413
414 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
415                        int is_net)
416 {
417         might_sleep();
418         if (peer_req->flags & EE_HAS_DIGEST)
419                 kfree(peer_req->digest);
420         drbd_free_pages(device, peer_req->pages, is_net);
421         D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
422         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
423         if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
424                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
425                 drbd_al_complete_io(device, &peer_req->i);
426         }
427         mempool_free(peer_req, drbd_ee_mempool);
428 }
429
430 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
431 {
432         LIST_HEAD(work_list);
433         struct drbd_peer_request *peer_req, *t;
434         int count = 0;
435         int is_net = list == &device->net_ee;
436
437         spin_lock_irq(&device->resource->req_lock);
438         list_splice_init(list, &work_list);
439         spin_unlock_irq(&device->resource->req_lock);
440
441         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
442                 __drbd_free_peer_req(device, peer_req, is_net);
443                 count++;
444         }
445         return count;
446 }
447
448 /*
449  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
450  */
451 static int drbd_finish_peer_reqs(struct drbd_device *device)
452 {
453         LIST_HEAD(work_list);
454         LIST_HEAD(reclaimed);
455         struct drbd_peer_request *peer_req, *t;
456         int err = 0;
457
458         spin_lock_irq(&device->resource->req_lock);
459         reclaim_finished_net_peer_reqs(device, &reclaimed);
460         list_splice_init(&device->done_ee, &work_list);
461         spin_unlock_irq(&device->resource->req_lock);
462
463         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
464                 drbd_free_net_peer_req(device, peer_req);
465
466         /* possible callbacks here:
467          * e_end_block, and e_end_resync_block, e_send_superseded.
468          * all ignore the last argument.
469          */
470         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
471                 int err2;
472
473                 /* list_del not necessary, next/prev members not touched */
474                 err2 = peer_req->w.cb(&peer_req->w, !!err);
475                 if (!err)
476                         err = err2;
477                 drbd_free_peer_req(device, peer_req);
478         }
479         wake_up(&device->ee_wait);
480
481         return err;
482 }
483
484 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
485                                      struct list_head *head)
486 {
487         DEFINE_WAIT(wait);
488
489         /* avoids spin_lock/unlock
490          * and calling prepare_to_wait in the fast path */
491         while (!list_empty(head)) {
492                 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
493                 spin_unlock_irq(&device->resource->req_lock);
494                 io_schedule();
495                 finish_wait(&device->ee_wait, &wait);
496                 spin_lock_irq(&device->resource->req_lock);
497         }
498 }
499
500 static void drbd_wait_ee_list_empty(struct drbd_device *device,
501                                     struct list_head *head)
502 {
503         spin_lock_irq(&device->resource->req_lock);
504         _drbd_wait_ee_list_empty(device, head);
505         spin_unlock_irq(&device->resource->req_lock);
506 }
507
508 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
509 {
510         struct kvec iov = {
511                 .iov_base = buf,
512                 .iov_len = size,
513         };
514         struct msghdr msg = {
515                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
516         };
517         return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
518 }
519
520 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
521 {
522         int rv;
523
524         rv = drbd_recv_short(connection->data.socket, buf, size, 0);
525
526         if (rv < 0) {
527                 if (rv == -ECONNRESET)
528                         drbd_info(connection, "sock was reset by peer\n");
529                 else if (rv != -ERESTARTSYS)
530                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
531         } else if (rv == 0) {
532                 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
533                         long t;
534                         rcu_read_lock();
535                         t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
536                         rcu_read_unlock();
537
538                         t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
539
540                         if (t)
541                                 goto out;
542                 }
543                 drbd_info(connection, "sock was shut down by peer\n");
544         }
545
546         if (rv != size)
547                 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
548
549 out:
550         return rv;
551 }
552
553 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
554 {
555         int err;
556
557         err = drbd_recv(connection, buf, size);
558         if (err != size) {
559                 if (err >= 0)
560                         err = -EIO;
561         } else
562                 err = 0;
563         return err;
564 }
565
566 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
567 {
568         int err;
569
570         err = drbd_recv_all(connection, buf, size);
571         if (err && !signal_pending(current))
572                 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
573         return err;
574 }
575
576 /* quoting tcp(7):
577  *   On individual connections, the socket buffer size must be set prior to the
578  *   listen(2) or connect(2) calls in order to have it take effect.
579  * This is our wrapper to do so.
580  */
581 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
582                 unsigned int rcv)
583 {
584         /* open coded SO_SNDBUF, SO_RCVBUF */
585         if (snd) {
586                 sock->sk->sk_sndbuf = snd;
587                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
588         }
589         if (rcv) {
590                 sock->sk->sk_rcvbuf = rcv;
591                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
592         }
593 }
594
595 static struct socket *drbd_try_connect(struct drbd_connection *connection)
596 {
597         const char *what;
598         struct socket *sock;
599         struct sockaddr_in6 src_in6;
600         struct sockaddr_in6 peer_in6;
601         struct net_conf *nc;
602         int err, peer_addr_len, my_addr_len;
603         int sndbuf_size, rcvbuf_size, connect_int;
604         int disconnect_on_error = 1;
605
606         rcu_read_lock();
607         nc = rcu_dereference(connection->net_conf);
608         if (!nc) {
609                 rcu_read_unlock();
610                 return NULL;
611         }
612         sndbuf_size = nc->sndbuf_size;
613         rcvbuf_size = nc->rcvbuf_size;
614         connect_int = nc->connect_int;
615         rcu_read_unlock();
616
617         my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
618         memcpy(&src_in6, &connection->my_addr, my_addr_len);
619
620         if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
621                 src_in6.sin6_port = 0;
622         else
623                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
624
625         peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
626         memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
627
628         what = "sock_create_kern";
629         err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
630                                SOCK_STREAM, IPPROTO_TCP, &sock);
631         if (err < 0) {
632                 sock = NULL;
633                 goto out;
634         }
635
636         sock->sk->sk_rcvtimeo =
637         sock->sk->sk_sndtimeo = connect_int * HZ;
638         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
639
640        /* explicitly bind to the configured IP as source IP
641         *  for the outgoing connections.
642         *  This is needed for multihomed hosts and to be
643         *  able to use lo: interfaces for drbd.
644         * Make sure to use 0 as port number, so linux selects
645         *  a free one dynamically.
646         */
647         what = "bind before connect";
648         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
649         if (err < 0)
650                 goto out;
651
652         /* connect may fail, peer not yet available.
653          * stay C_WF_CONNECTION, don't go Disconnecting! */
654         disconnect_on_error = 0;
655         what = "connect";
656         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
657
658 out:
659         if (err < 0) {
660                 if (sock) {
661                         sock_release(sock);
662                         sock = NULL;
663                 }
664                 switch (-err) {
665                         /* timeout, busy, signal pending */
666                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
667                 case EINTR: case ERESTARTSYS:
668                         /* peer not (yet) available, network problem */
669                 case ECONNREFUSED: case ENETUNREACH:
670                 case EHOSTDOWN:    case EHOSTUNREACH:
671                         disconnect_on_error = 0;
672                         break;
673                 default:
674                         drbd_err(connection, "%s failed, err = %d\n", what, err);
675                 }
676                 if (disconnect_on_error)
677                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
678         }
679
680         return sock;
681 }
682
683 struct accept_wait_data {
684         struct drbd_connection *connection;
685         struct socket *s_listen;
686         struct completion door_bell;
687         void (*original_sk_state_change)(struct sock *sk);
688
689 };
690
691 static void drbd_incoming_connection(struct sock *sk)
692 {
693         struct accept_wait_data *ad = sk->sk_user_data;
694         void (*state_change)(struct sock *sk);
695
696         state_change = ad->original_sk_state_change;
697         if (sk->sk_state == TCP_ESTABLISHED)
698                 complete(&ad->door_bell);
699         state_change(sk);
700 }
701
702 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
703 {
704         int err, sndbuf_size, rcvbuf_size, my_addr_len;
705         struct sockaddr_in6 my_addr;
706         struct socket *s_listen;
707         struct net_conf *nc;
708         const char *what;
709
710         rcu_read_lock();
711         nc = rcu_dereference(connection->net_conf);
712         if (!nc) {
713                 rcu_read_unlock();
714                 return -EIO;
715         }
716         sndbuf_size = nc->sndbuf_size;
717         rcvbuf_size = nc->rcvbuf_size;
718         rcu_read_unlock();
719
720         my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
721         memcpy(&my_addr, &connection->my_addr, my_addr_len);
722
723         what = "sock_create_kern";
724         err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
725                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
726         if (err) {
727                 s_listen = NULL;
728                 goto out;
729         }
730
731         s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
732         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
733
734         what = "bind before listen";
735         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
736         if (err < 0)
737                 goto out;
738
739         ad->s_listen = s_listen;
740         write_lock_bh(&s_listen->sk->sk_callback_lock);
741         ad->original_sk_state_change = s_listen->sk->sk_state_change;
742         s_listen->sk->sk_state_change = drbd_incoming_connection;
743         s_listen->sk->sk_user_data = ad;
744         write_unlock_bh(&s_listen->sk->sk_callback_lock);
745
746         what = "listen";
747         err = s_listen->ops->listen(s_listen, 5);
748         if (err < 0)
749                 goto out;
750
751         return 0;
752 out:
753         if (s_listen)
754                 sock_release(s_listen);
755         if (err < 0) {
756                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
757                         drbd_err(connection, "%s failed, err = %d\n", what, err);
758                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
759                 }
760         }
761
762         return -EIO;
763 }
764
765 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
766 {
767         write_lock_bh(&sk->sk_callback_lock);
768         sk->sk_state_change = ad->original_sk_state_change;
769         sk->sk_user_data = NULL;
770         write_unlock_bh(&sk->sk_callback_lock);
771 }
772
773 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
774 {
775         int timeo, connect_int, err = 0;
776         struct socket *s_estab = NULL;
777         struct net_conf *nc;
778
779         rcu_read_lock();
780         nc = rcu_dereference(connection->net_conf);
781         if (!nc) {
782                 rcu_read_unlock();
783                 return NULL;
784         }
785         connect_int = nc->connect_int;
786         rcu_read_unlock();
787
788         timeo = connect_int * HZ;
789         /* 28.5% random jitter */
790         timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
791
792         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
793         if (err <= 0)
794                 return NULL;
795
796         err = kernel_accept(ad->s_listen, &s_estab, 0);
797         if (err < 0) {
798                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
799                         drbd_err(connection, "accept failed, err = %d\n", err);
800                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
801                 }
802         }
803
804         if (s_estab)
805                 unregister_state_change(s_estab->sk, ad);
806
807         return s_estab;
808 }
809
810 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
811
812 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
813                              enum drbd_packet cmd)
814 {
815         if (!conn_prepare_command(connection, sock))
816                 return -EIO;
817         return conn_send_command(connection, sock, cmd, 0, NULL, 0);
818 }
819
820 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
821 {
822         unsigned int header_size = drbd_header_size(connection);
823         struct packet_info pi;
824         struct net_conf *nc;
825         int err;
826
827         rcu_read_lock();
828         nc = rcu_dereference(connection->net_conf);
829         if (!nc) {
830                 rcu_read_unlock();
831                 return -EIO;
832         }
833         sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
834         rcu_read_unlock();
835
836         err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
837         if (err != header_size) {
838                 if (err >= 0)
839                         err = -EIO;
840                 return err;
841         }
842         err = decode_header(connection, connection->data.rbuf, &pi);
843         if (err)
844                 return err;
845         return pi.cmd;
846 }
847
848 /**
849  * drbd_socket_okay() - Free the socket if its connection is not okay
850  * @sock:       pointer to the pointer to the socket.
851  */
852 static bool drbd_socket_okay(struct socket **sock)
853 {
854         int rr;
855         char tb[4];
856
857         if (!*sock)
858                 return false;
859
860         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
861
862         if (rr > 0 || rr == -EAGAIN) {
863                 return true;
864         } else {
865                 sock_release(*sock);
866                 *sock = NULL;
867                 return false;
868         }
869 }
870
871 static bool connection_established(struct drbd_connection *connection,
872                                    struct socket **sock1,
873                                    struct socket **sock2)
874 {
875         struct net_conf *nc;
876         int timeout;
877         bool ok;
878
879         if (!*sock1 || !*sock2)
880                 return false;
881
882         rcu_read_lock();
883         nc = rcu_dereference(connection->net_conf);
884         timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
885         rcu_read_unlock();
886         schedule_timeout_interruptible(timeout);
887
888         ok = drbd_socket_okay(sock1);
889         ok = drbd_socket_okay(sock2) && ok;
890
891         return ok;
892 }
893
894 /* Gets called if a connection is established, or if a new minor gets created
895    in a connection */
896 int drbd_connected(struct drbd_peer_device *peer_device)
897 {
898         struct drbd_device *device = peer_device->device;
899         int err;
900
901         atomic_set(&device->packet_seq, 0);
902         device->peer_seq = 0;
903
904         device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
905                 &peer_device->connection->cstate_mutex :
906                 &device->own_state_mutex;
907
908         err = drbd_send_sync_param(peer_device);
909         if (!err)
910                 err = drbd_send_sizes(peer_device, 0, 0);
911         if (!err)
912                 err = drbd_send_uuids(peer_device);
913         if (!err)
914                 err = drbd_send_current_state(peer_device);
915         clear_bit(USE_DEGR_WFC_T, &device->flags);
916         clear_bit(RESIZE_PENDING, &device->flags);
917         atomic_set(&device->ap_in_flight, 0);
918         mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
919         return err;
920 }
921
922 /*
923  * return values:
924  *   1 yes, we have a valid connection
925  *   0 oops, did not work out, please try again
926  *  -1 peer talks different language,
927  *     no point in trying again, please go standalone.
928  *  -2 We do not have a network config...
929  */
930 static int conn_connect(struct drbd_connection *connection)
931 {
932         struct drbd_socket sock, msock;
933         struct drbd_peer_device *peer_device;
934         struct net_conf *nc;
935         int vnr, timeout, h;
936         bool discard_my_data, ok;
937         enum drbd_state_rv rv;
938         struct accept_wait_data ad = {
939                 .connection = connection,
940                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
941         };
942
943         clear_bit(DISCONNECT_SENT, &connection->flags);
944         if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
945                 return -2;
946
947         mutex_init(&sock.mutex);
948         sock.sbuf = connection->data.sbuf;
949         sock.rbuf = connection->data.rbuf;
950         sock.socket = NULL;
951         mutex_init(&msock.mutex);
952         msock.sbuf = connection->meta.sbuf;
953         msock.rbuf = connection->meta.rbuf;
954         msock.socket = NULL;
955
956         /* Assume that the peer only understands protocol 80 until we know better.  */
957         connection->agreed_pro_version = 80;
958
959         if (prepare_listen_socket(connection, &ad))
960                 return 0;
961
962         do {
963                 struct socket *s;
964
965                 s = drbd_try_connect(connection);
966                 if (s) {
967                         if (!sock.socket) {
968                                 sock.socket = s;
969                                 send_first_packet(connection, &sock, P_INITIAL_DATA);
970                         } else if (!msock.socket) {
971                                 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
972                                 msock.socket = s;
973                                 send_first_packet(connection, &msock, P_INITIAL_META);
974                         } else {
975                                 drbd_err(connection, "Logic error in conn_connect()\n");
976                                 goto out_release_sockets;
977                         }
978                 }
979
980                 if (connection_established(connection, &sock.socket, &msock.socket))
981                         break;
982
983 retry:
984                 s = drbd_wait_for_connect(connection, &ad);
985                 if (s) {
986                         int fp = receive_first_packet(connection, s);
987                         drbd_socket_okay(&sock.socket);
988                         drbd_socket_okay(&msock.socket);
989                         switch (fp) {
990                         case P_INITIAL_DATA:
991                                 if (sock.socket) {
992                                         drbd_warn(connection, "initial packet S crossed\n");
993                                         sock_release(sock.socket);
994                                         sock.socket = s;
995                                         goto randomize;
996                                 }
997                                 sock.socket = s;
998                                 break;
999                         case P_INITIAL_META:
1000                                 set_bit(RESOLVE_CONFLICTS, &connection->flags);
1001                                 if (msock.socket) {
1002                                         drbd_warn(connection, "initial packet M crossed\n");
1003                                         sock_release(msock.socket);
1004                                         msock.socket = s;
1005                                         goto randomize;
1006                                 }
1007                                 msock.socket = s;
1008                                 break;
1009                         default:
1010                                 drbd_warn(connection, "Error receiving initial packet\n");
1011                                 sock_release(s);
1012 randomize:
1013                                 if (prandom_u32() & 1)
1014                                         goto retry;
1015                         }
1016                 }
1017
1018                 if (connection->cstate <= C_DISCONNECTING)
1019                         goto out_release_sockets;
1020                 if (signal_pending(current)) {
1021                         flush_signals(current);
1022                         smp_rmb();
1023                         if (get_t_state(&connection->receiver) == EXITING)
1024                                 goto out_release_sockets;
1025                 }
1026
1027                 ok = connection_established(connection, &sock.socket, &msock.socket);
1028         } while (!ok);
1029
1030         if (ad.s_listen)
1031                 sock_release(ad.s_listen);
1032
1033         sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1034         msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1035
1036         sock.socket->sk->sk_allocation = GFP_NOIO;
1037         msock.socket->sk->sk_allocation = GFP_NOIO;
1038
1039         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1040         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1041
1042         /* NOT YET ...
1043          * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1044          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1045          * first set it to the P_CONNECTION_FEATURES timeout,
1046          * which we set to 4x the configured ping_timeout. */
1047         rcu_read_lock();
1048         nc = rcu_dereference(connection->net_conf);
1049
1050         sock.socket->sk->sk_sndtimeo =
1051         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1052
1053         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1054         timeout = nc->timeout * HZ / 10;
1055         discard_my_data = nc->discard_my_data;
1056         rcu_read_unlock();
1057
1058         msock.socket->sk->sk_sndtimeo = timeout;
1059
1060         /* we don't want delays.
1061          * we use TCP_CORK where appropriate, though */
1062         drbd_tcp_nodelay(sock.socket);
1063         drbd_tcp_nodelay(msock.socket);
1064
1065         connection->data.socket = sock.socket;
1066         connection->meta.socket = msock.socket;
1067         connection->last_received = jiffies;
1068
1069         h = drbd_do_features(connection);
1070         if (h <= 0)
1071                 return h;
1072
1073         if (connection->cram_hmac_tfm) {
1074                 /* drbd_request_state(device, NS(conn, WFAuth)); */
1075                 switch (drbd_do_auth(connection)) {
1076                 case -1:
1077                         drbd_err(connection, "Authentication of peer failed\n");
1078                         return -1;
1079                 case 0:
1080                         drbd_err(connection, "Authentication of peer failed, trying again.\n");
1081                         return 0;
1082                 }
1083         }
1084
1085         connection->data.socket->sk->sk_sndtimeo = timeout;
1086         connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1087
1088         if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1089                 return -1;
1090
1091         /* Prevent a race between resync-handshake and
1092          * being promoted to Primary.
1093          *
1094          * Grab and release the state mutex, so we know that any current
1095          * drbd_set_role() is finished, and any incoming drbd_set_role
1096          * will see the STATE_SENT flag, and wait for it to be cleared.
1097          */
1098         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1099                 mutex_lock(peer_device->device->state_mutex);
1100
1101         set_bit(STATE_SENT, &connection->flags);
1102
1103         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1104                 mutex_unlock(peer_device->device->state_mutex);
1105
1106         rcu_read_lock();
1107         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1108                 struct drbd_device *device = peer_device->device;
1109                 kref_get(&device->kref);
1110                 rcu_read_unlock();
1111
1112                 if (discard_my_data)
1113                         set_bit(DISCARD_MY_DATA, &device->flags);
1114                 else
1115                         clear_bit(DISCARD_MY_DATA, &device->flags);
1116
1117                 drbd_connected(peer_device);
1118                 kref_put(&device->kref, drbd_destroy_device);
1119                 rcu_read_lock();
1120         }
1121         rcu_read_unlock();
1122
1123         rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1124         if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1125                 clear_bit(STATE_SENT, &connection->flags);
1126                 return 0;
1127         }
1128
1129         drbd_thread_start(&connection->ack_receiver);
1130         /* opencoded create_singlethread_workqueue(),
1131          * to be able to use format string arguments */
1132         connection->ack_sender =
1133                 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1134         if (!connection->ack_sender) {
1135                 drbd_err(connection, "Failed to create workqueue ack_sender\n");
1136                 return 0;
1137         }
1138
1139         mutex_lock(&connection->resource->conf_update);
1140         /* The discard_my_data flag is a single-shot modifier to the next
1141          * connection attempt, the handshake of which is now well underway.
1142          * No need for rcu style copying of the whole struct
1143          * just to clear a single value. */
1144         connection->net_conf->discard_my_data = 0;
1145         mutex_unlock(&connection->resource->conf_update);
1146
1147         return h;
1148
1149 out_release_sockets:
1150         if (ad.s_listen)
1151                 sock_release(ad.s_listen);
1152         if (sock.socket)
1153                 sock_release(sock.socket);
1154         if (msock.socket)
1155                 sock_release(msock.socket);
1156         return -1;
1157 }
1158
1159 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1160 {
1161         unsigned int header_size = drbd_header_size(connection);
1162
1163         if (header_size == sizeof(struct p_header100) &&
1164             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1165                 struct p_header100 *h = header;
1166                 if (h->pad != 0) {
1167                         drbd_err(connection, "Header padding is not zero\n");
1168                         return -EINVAL;
1169                 }
1170                 pi->vnr = be16_to_cpu(h->volume);
1171                 pi->cmd = be16_to_cpu(h->command);
1172                 pi->size = be32_to_cpu(h->length);
1173         } else if (header_size == sizeof(struct p_header95) &&
1174                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1175                 struct p_header95 *h = header;
1176                 pi->cmd = be16_to_cpu(h->command);
1177                 pi->size = be32_to_cpu(h->length);
1178                 pi->vnr = 0;
1179         } else if (header_size == sizeof(struct p_header80) &&
1180                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1181                 struct p_header80 *h = header;
1182                 pi->cmd = be16_to_cpu(h->command);
1183                 pi->size = be16_to_cpu(h->length);
1184                 pi->vnr = 0;
1185         } else {
1186                 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1187                          be32_to_cpu(*(__be32 *)header),
1188                          connection->agreed_pro_version);
1189                 return -EINVAL;
1190         }
1191         pi->data = header + header_size;
1192         return 0;
1193 }
1194
1195 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1196 {
1197         void *buffer = connection->data.rbuf;
1198         int err;
1199
1200         err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1201         if (err)
1202                 return err;
1203
1204         err = decode_header(connection, buffer, pi);
1205         connection->last_received = jiffies;
1206
1207         return err;
1208 }
1209
1210 /* This is blkdev_issue_flush, but asynchronous.
1211  * We want to submit to all component volumes in parallel,
1212  * then wait for all completions.
1213  */
1214 struct issue_flush_context {
1215         atomic_t pending;
1216         int error;
1217         struct completion done;
1218 };
1219 struct one_flush_context {
1220         struct drbd_device *device;
1221         struct issue_flush_context *ctx;
1222 };
1223
1224 void one_flush_endio(struct bio *bio)
1225 {
1226         struct one_flush_context *octx = bio->bi_private;
1227         struct drbd_device *device = octx->device;
1228         struct issue_flush_context *ctx = octx->ctx;
1229
1230         if (bio->bi_error) {
1231                 ctx->error = bio->bi_error;
1232                 drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_error);
1233         }
1234         kfree(octx);
1235         bio_put(bio);
1236
1237         clear_bit(FLUSH_PENDING, &device->flags);
1238         put_ldev(device);
1239         kref_put(&device->kref, drbd_destroy_device);
1240
1241         if (atomic_dec_and_test(&ctx->pending))
1242                 complete(&ctx->done);
1243 }
1244
1245 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1246 {
1247         struct bio *bio = bio_alloc(GFP_NOIO, 0);
1248         struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1249         if (!bio || !octx) {
1250                 drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1251                 /* FIXME: what else can I do now?  disconnecting or detaching
1252                  * really does not help to improve the state of the world, either.
1253                  */
1254                 kfree(octx);
1255                 if (bio)
1256                         bio_put(bio);
1257
1258                 ctx->error = -ENOMEM;
1259                 put_ldev(device);
1260                 kref_put(&device->kref, drbd_destroy_device);
1261                 return;
1262         }
1263
1264         octx->device = device;
1265         octx->ctx = ctx;
1266         bio->bi_bdev = device->ldev->backing_bdev;
1267         bio->bi_private = octx;
1268         bio->bi_end_io = one_flush_endio;
1269         bio_set_op_attrs(bio, REQ_OP_FLUSH, WRITE_FLUSH);
1270
1271         device->flush_jif = jiffies;
1272         set_bit(FLUSH_PENDING, &device->flags);
1273         atomic_inc(&ctx->pending);
1274         submit_bio(bio);
1275 }
1276
1277 static void drbd_flush(struct drbd_connection *connection)
1278 {
1279         if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1280                 struct drbd_peer_device *peer_device;
1281                 struct issue_flush_context ctx;
1282                 int vnr;
1283
1284                 atomic_set(&ctx.pending, 1);
1285                 ctx.error = 0;
1286                 init_completion(&ctx.done);
1287
1288                 rcu_read_lock();
1289                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1290                         struct drbd_device *device = peer_device->device;
1291
1292                         if (!get_ldev(device))
1293                                 continue;
1294                         kref_get(&device->kref);
1295                         rcu_read_unlock();
1296
1297                         submit_one_flush(device, &ctx);
1298
1299                         rcu_read_lock();
1300                 }
1301                 rcu_read_unlock();
1302
1303                 /* Do we want to add a timeout,
1304                  * if disk-timeout is set? */
1305                 if (!atomic_dec_and_test(&ctx.pending))
1306                         wait_for_completion(&ctx.done);
1307
1308                 if (ctx.error) {
1309                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1310                          * don't try again for ANY return value != 0
1311                          * if (rv == -EOPNOTSUPP) */
1312                         /* Any error is already reported by bio_endio callback. */
1313                         drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1314                 }
1315         }
1316 }
1317
1318 /**
1319  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1320  * @device:     DRBD device.
1321  * @epoch:      Epoch object.
1322  * @ev:         Epoch event.
1323  */
1324 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1325                                                struct drbd_epoch *epoch,
1326                                                enum epoch_event ev)
1327 {
1328         int epoch_size;
1329         struct drbd_epoch *next_epoch;
1330         enum finish_epoch rv = FE_STILL_LIVE;
1331
1332         spin_lock(&connection->epoch_lock);
1333         do {
1334                 next_epoch = NULL;
1335
1336                 epoch_size = atomic_read(&epoch->epoch_size);
1337
1338                 switch (ev & ~EV_CLEANUP) {
1339                 case EV_PUT:
1340                         atomic_dec(&epoch->active);
1341                         break;
1342                 case EV_GOT_BARRIER_NR:
1343                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1344                         break;
1345                 case EV_BECAME_LAST:
1346                         /* nothing to do*/
1347                         break;
1348                 }
1349
1350                 if (epoch_size != 0 &&
1351                     atomic_read(&epoch->active) == 0 &&
1352                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1353                         if (!(ev & EV_CLEANUP)) {
1354                                 spin_unlock(&connection->epoch_lock);
1355                                 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1356                                 spin_lock(&connection->epoch_lock);
1357                         }
1358 #if 0
1359                         /* FIXME: dec unacked on connection, once we have
1360                          * something to count pending connection packets in. */
1361                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1362                                 dec_unacked(epoch->connection);
1363 #endif
1364
1365                         if (connection->current_epoch != epoch) {
1366                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1367                                 list_del(&epoch->list);
1368                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1369                                 connection->epochs--;
1370                                 kfree(epoch);
1371
1372                                 if (rv == FE_STILL_LIVE)
1373                                         rv = FE_DESTROYED;
1374                         } else {
1375                                 epoch->flags = 0;
1376                                 atomic_set(&epoch->epoch_size, 0);
1377                                 /* atomic_set(&epoch->active, 0); is already zero */
1378                                 if (rv == FE_STILL_LIVE)
1379                                         rv = FE_RECYCLED;
1380                         }
1381                 }
1382
1383                 if (!next_epoch)
1384                         break;
1385
1386                 epoch = next_epoch;
1387         } while (1);
1388
1389         spin_unlock(&connection->epoch_lock);
1390
1391         return rv;
1392 }
1393
1394 static enum write_ordering_e
1395 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1396 {
1397         struct disk_conf *dc;
1398
1399         dc = rcu_dereference(bdev->disk_conf);
1400
1401         if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1402                 wo = WO_DRAIN_IO;
1403         if (wo == WO_DRAIN_IO && !dc->disk_drain)
1404                 wo = WO_NONE;
1405
1406         return wo;
1407 }
1408
1409 /**
1410  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1411  * @connection: DRBD connection.
1412  * @wo:         Write ordering method to try.
1413  */
1414 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1415                               enum write_ordering_e wo)
1416 {
1417         struct drbd_device *device;
1418         enum write_ordering_e pwo;
1419         int vnr;
1420         static char *write_ordering_str[] = {
1421                 [WO_NONE] = "none",
1422                 [WO_DRAIN_IO] = "drain",
1423                 [WO_BDEV_FLUSH] = "flush",
1424         };
1425
1426         pwo = resource->write_ordering;
1427         if (wo != WO_BDEV_FLUSH)
1428                 wo = min(pwo, wo);
1429         rcu_read_lock();
1430         idr_for_each_entry(&resource->devices, device, vnr) {
1431                 if (get_ldev(device)) {
1432                         wo = max_allowed_wo(device->ldev, wo);
1433                         if (device->ldev == bdev)
1434                                 bdev = NULL;
1435                         put_ldev(device);
1436                 }
1437         }
1438
1439         if (bdev)
1440                 wo = max_allowed_wo(bdev, wo);
1441
1442         rcu_read_unlock();
1443
1444         resource->write_ordering = wo;
1445         if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1446                 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1447 }
1448
1449 /*
1450  * We *may* ignore the discard-zeroes-data setting, if so configured.
1451  *
1452  * Assumption is that it "discard_zeroes_data=0" is only because the backend
1453  * may ignore partial unaligned discards.
1454  *
1455  * LVM/DM thin as of at least
1456  *   LVM version:     2.02.115(2)-RHEL7 (2015-01-28)
1457  *   Library version: 1.02.93-RHEL7 (2015-01-28)
1458  *   Driver version:  4.29.0
1459  * still behaves this way.
1460  *
1461  * For unaligned (wrt. alignment and granularity) or too small discards,
1462  * we zero-out the initial (and/or) trailing unaligned partial chunks,
1463  * but discard all the aligned full chunks.
1464  *
1465  * At least for LVM/DM thin, the result is effectively "discard_zeroes_data=1".
1466  */
1467 int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, bool discard)
1468 {
1469         struct block_device *bdev = device->ldev->backing_bdev;
1470         struct request_queue *q = bdev_get_queue(bdev);
1471         sector_t tmp, nr;
1472         unsigned int max_discard_sectors, granularity;
1473         int alignment;
1474         int err = 0;
1475
1476         if (!discard)
1477                 goto zero_out;
1478
1479         /* Zero-sector (unknown) and one-sector granularities are the same.  */
1480         granularity = max(q->limits.discard_granularity >> 9, 1U);
1481         alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
1482
1483         max_discard_sectors = min(q->limits.max_discard_sectors, (1U << 22));
1484         max_discard_sectors -= max_discard_sectors % granularity;
1485         if (unlikely(!max_discard_sectors))
1486                 goto zero_out;
1487
1488         if (nr_sectors < granularity)
1489                 goto zero_out;
1490
1491         tmp = start;
1492         if (sector_div(tmp, granularity) != alignment) {
1493                 if (nr_sectors < 2*granularity)
1494                         goto zero_out;
1495                 /* start + gran - (start + gran - align) % gran */
1496                 tmp = start + granularity - alignment;
1497                 tmp = start + granularity - sector_div(tmp, granularity);
1498
1499                 nr = tmp - start;
1500                 err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
1501                 nr_sectors -= nr;
1502                 start = tmp;
1503         }
1504         while (nr_sectors >= granularity) {
1505                 nr = min_t(sector_t, nr_sectors, max_discard_sectors);
1506                 err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO, 0);
1507                 nr_sectors -= nr;
1508                 start += nr;
1509         }
1510  zero_out:
1511         if (nr_sectors) {
1512                 err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO, 0);
1513         }
1514         return err != 0;
1515 }
1516
1517 static bool can_do_reliable_discards(struct drbd_device *device)
1518 {
1519         struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
1520         struct disk_conf *dc;
1521         bool can_do;
1522
1523         if (!blk_queue_discard(q))
1524                 return false;
1525
1526         if (q->limits.discard_zeroes_data)
1527                 return true;
1528
1529         rcu_read_lock();
1530         dc = rcu_dereference(device->ldev->disk_conf);
1531         can_do = dc->discard_zeroes_if_aligned;
1532         rcu_read_unlock();
1533         return can_do;
1534 }
1535
1536 static void drbd_issue_peer_discard(struct drbd_device *device, struct drbd_peer_request *peer_req)
1537 {
1538         /* If the backend cannot discard, or does not guarantee
1539          * read-back zeroes in discarded ranges, we fall back to
1540          * zero-out.  Unless configuration specifically requested
1541          * otherwise. */
1542         if (!can_do_reliable_discards(device))
1543                 peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
1544
1545         if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
1546             peer_req->i.size >> 9, !(peer_req->flags & EE_IS_TRIM_USE_ZEROOUT)))
1547                 peer_req->flags |= EE_WAS_ERROR;
1548         drbd_endio_write_sec_final(peer_req);
1549 }
1550
1551 static void drbd_issue_peer_wsame(struct drbd_device *device,
1552                                   struct drbd_peer_request *peer_req)
1553 {
1554         struct block_device *bdev = device->ldev->backing_bdev;
1555         sector_t s = peer_req->i.sector;
1556         sector_t nr = peer_req->i.size >> 9;
1557         if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages))
1558                 peer_req->flags |= EE_WAS_ERROR;
1559         drbd_endio_write_sec_final(peer_req);
1560 }
1561
1562
1563 /**
1564  * drbd_submit_peer_request()
1565  * @device:     DRBD device.
1566  * @peer_req:   peer request
1567  * @rw:         flag field, see bio->bi_opf
1568  *
1569  * May spread the pages to multiple bios,
1570  * depending on bio_add_page restrictions.
1571  *
1572  * Returns 0 if all bios have been submitted,
1573  * -ENOMEM if we could not allocate enough bios,
1574  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1575  *  single page to an empty bio (which should never happen and likely indicates
1576  *  that the lower level IO stack is in some way broken). This has been observed
1577  *  on certain Xen deployments.
1578  */
1579 /* TODO allocate from our own bio_set. */
1580 int drbd_submit_peer_request(struct drbd_device *device,
1581                              struct drbd_peer_request *peer_req,
1582                              const unsigned op, const unsigned op_flags,
1583                              const int fault_type)
1584 {
1585         struct bio *bios = NULL;
1586         struct bio *bio;
1587         struct page *page = peer_req->pages;
1588         sector_t sector = peer_req->i.sector;
1589         unsigned data_size = peer_req->i.size;
1590         unsigned n_bios = 0;
1591         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1592         int err = -ENOMEM;
1593
1594         /* TRIM/DISCARD: for now, always use the helper function
1595          * blkdev_issue_zeroout(..., discard=true).
1596          * It's synchronous, but it does the right thing wrt. bio splitting.
1597          * Correctness first, performance later.  Next step is to code an
1598          * asynchronous variant of the same.
1599          */
1600         if (peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) {
1601                 /* wait for all pending IO completions, before we start
1602                  * zeroing things out. */
1603                 conn_wait_active_ee_empty(peer_req->peer_device->connection);
1604                 /* add it to the active list now,
1605                  * so we can find it to present it in debugfs */
1606                 peer_req->submit_jif = jiffies;
1607                 peer_req->flags |= EE_SUBMITTED;
1608
1609                 /* If this was a resync request from receive_rs_deallocated(),
1610                  * it is already on the sync_ee list */
1611                 if (list_empty(&peer_req->w.list)) {
1612                         spin_lock_irq(&device->resource->req_lock);
1613                         list_add_tail(&peer_req->w.list, &device->active_ee);
1614                         spin_unlock_irq(&device->resource->req_lock);
1615                 }
1616
1617                 if (peer_req->flags & EE_IS_TRIM)
1618                         drbd_issue_peer_discard(device, peer_req);
1619                 else /* EE_WRITE_SAME */
1620                         drbd_issue_peer_wsame(device, peer_req);
1621                 return 0;
1622         }
1623
1624         /* In most cases, we will only need one bio.  But in case the lower
1625          * level restrictions happen to be different at this offset on this
1626          * side than those of the sending peer, we may need to submit the
1627          * request in more than one bio.
1628          *
1629          * Plain bio_alloc is good enough here, this is no DRBD internally
1630          * generated bio, but a bio allocated on behalf of the peer.
1631          */
1632 next_bio:
1633         bio = bio_alloc(GFP_NOIO, nr_pages);
1634         if (!bio) {
1635                 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1636                 goto fail;
1637         }
1638         /* > peer_req->i.sector, unless this is the first bio */
1639         bio->bi_iter.bi_sector = sector;
1640         bio->bi_bdev = device->ldev->backing_bdev;
1641         bio_set_op_attrs(bio, op, op_flags);
1642         bio->bi_private = peer_req;
1643         bio->bi_end_io = drbd_peer_request_endio;
1644
1645         bio->bi_next = bios;
1646         bios = bio;
1647         ++n_bios;
1648
1649         page_chain_for_each(page) {
1650                 unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1651                 if (!bio_add_page(bio, page, len, 0)) {
1652                         /* A single page must always be possible!
1653                          * But in case it fails anyways,
1654                          * we deal with it, and complain (below). */
1655                         if (bio->bi_vcnt == 0) {
1656                                 drbd_err(device,
1657                                         "bio_add_page failed for len=%u, "
1658                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1659                                         len, (uint64_t)bio->bi_iter.bi_sector);
1660                                 err = -ENOSPC;
1661                                 goto fail;
1662                         }
1663                         goto next_bio;
1664                 }
1665                 data_size -= len;
1666                 sector += len >> 9;
1667                 --nr_pages;
1668         }
1669         D_ASSERT(device, data_size == 0);
1670         D_ASSERT(device, page == NULL);
1671
1672         atomic_set(&peer_req->pending_bios, n_bios);
1673         /* for debugfs: update timestamp, mark as submitted */
1674         peer_req->submit_jif = jiffies;
1675         peer_req->flags |= EE_SUBMITTED;
1676         do {
1677                 bio = bios;
1678                 bios = bios->bi_next;
1679                 bio->bi_next = NULL;
1680
1681                 drbd_generic_make_request(device, fault_type, bio);
1682         } while (bios);
1683         return 0;
1684
1685 fail:
1686         while (bios) {
1687                 bio = bios;
1688                 bios = bios->bi_next;
1689                 bio_put(bio);
1690         }
1691         return err;
1692 }
1693
1694 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1695                                              struct drbd_peer_request *peer_req)
1696 {
1697         struct drbd_interval *i = &peer_req->i;
1698
1699         drbd_remove_interval(&device->write_requests, i);
1700         drbd_clear_interval(i);
1701
1702         /* Wake up any processes waiting for this peer request to complete.  */
1703         if (i->waiting)
1704                 wake_up(&device->misc_wait);
1705 }
1706
1707 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1708 {
1709         struct drbd_peer_device *peer_device;
1710         int vnr;
1711
1712         rcu_read_lock();
1713         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1714                 struct drbd_device *device = peer_device->device;
1715
1716                 kref_get(&device->kref);
1717                 rcu_read_unlock();
1718                 drbd_wait_ee_list_empty(device, &device->active_ee);
1719                 kref_put(&device->kref, drbd_destroy_device);
1720                 rcu_read_lock();
1721         }
1722         rcu_read_unlock();
1723 }
1724
1725 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1726 {
1727         int rv;
1728         struct p_barrier *p = pi->data;
1729         struct drbd_epoch *epoch;
1730
1731         /* FIXME these are unacked on connection,
1732          * not a specific (peer)device.
1733          */
1734         connection->current_epoch->barrier_nr = p->barrier;
1735         connection->current_epoch->connection = connection;
1736         rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1737
1738         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1739          * the activity log, which means it would not be resynced in case the
1740          * R_PRIMARY crashes now.
1741          * Therefore we must send the barrier_ack after the barrier request was
1742          * completed. */
1743         switch (connection->resource->write_ordering) {
1744         case WO_NONE:
1745                 if (rv == FE_RECYCLED)
1746                         return 0;
1747
1748                 /* receiver context, in the writeout path of the other node.
1749                  * avoid potential distributed deadlock */
1750                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1751                 if (epoch)
1752                         break;
1753                 else
1754                         drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1755                         /* Fall through */
1756
1757         case WO_BDEV_FLUSH:
1758         case WO_DRAIN_IO:
1759                 conn_wait_active_ee_empty(connection);
1760                 drbd_flush(connection);
1761
1762                 if (atomic_read(&connection->current_epoch->epoch_size)) {
1763                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1764                         if (epoch)
1765                                 break;
1766                 }
1767
1768                 return 0;
1769         default:
1770                 drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1771                          connection->resource->write_ordering);
1772                 return -EIO;
1773         }
1774
1775         epoch->flags = 0;
1776         atomic_set(&epoch->epoch_size, 0);
1777         atomic_set(&epoch->active, 0);
1778
1779         spin_lock(&connection->epoch_lock);
1780         if (atomic_read(&connection->current_epoch->epoch_size)) {
1781                 list_add(&epoch->list, &connection->current_epoch->list);
1782                 connection->current_epoch = epoch;
1783                 connection->epochs++;
1784         } else {
1785                 /* The current_epoch got recycled while we allocated this one... */
1786                 kfree(epoch);
1787         }
1788         spin_unlock(&connection->epoch_lock);
1789
1790         return 0;
1791 }
1792
1793 /* quick wrapper in case payload size != request_size (write same) */
1794 static void drbd_csum_ee_size(struct crypto_ahash *h,
1795                               struct drbd_peer_request *r, void *d,
1796                               unsigned int payload_size)
1797 {
1798         unsigned int tmp = r->i.size;
1799         r->i.size = payload_size;
1800         drbd_csum_ee(h, r, d);
1801         r->i.size = tmp;
1802 }
1803
1804 /* used from receive_RSDataReply (recv_resync_read)
1805  * and from receive_Data.
1806  * data_size: actual payload ("data in")
1807  *      for normal writes that is bi_size.
1808  *      for discards, that is zero.
1809  *      for write same, it is logical_block_size.
1810  * both trim and write same have the bi_size ("data len to be affected")
1811  * as extra argument in the packet header.
1812  */
1813 static struct drbd_peer_request *
1814 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1815               struct packet_info *pi) __must_hold(local)
1816 {
1817         struct drbd_device *device = peer_device->device;
1818         const sector_t capacity = drbd_get_capacity(device->this_bdev);
1819         struct drbd_peer_request *peer_req;
1820         struct page *page;
1821         int digest_size, err;
1822         unsigned int data_size = pi->size, ds;
1823         void *dig_in = peer_device->connection->int_dig_in;
1824         void *dig_vv = peer_device->connection->int_dig_vv;
1825         unsigned long *data;
1826         struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1827         struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL;
1828
1829         digest_size = 0;
1830         if (!trim && peer_device->connection->peer_integrity_tfm) {
1831                 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1832                 /*
1833                  * FIXME: Receive the incoming digest into the receive buffer
1834                  *        here, together with its struct p_data?
1835                  */
1836                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1837                 if (err)
1838                         return NULL;
1839                 data_size -= digest_size;
1840         }
1841
1842         /* assume request_size == data_size, but special case trim and wsame. */
1843         ds = data_size;
1844         if (trim) {
1845                 if (!expect(data_size == 0))
1846                         return NULL;
1847                 ds = be32_to_cpu(trim->size);
1848         } else if (wsame) {
1849                 if (data_size != queue_logical_block_size(device->rq_queue)) {
1850                         drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n",
1851                                 data_size, queue_logical_block_size(device->rq_queue));
1852                         return NULL;
1853                 }
1854                 if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) {
1855                         drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n",
1856                                 data_size, bdev_logical_block_size(device->ldev->backing_bdev));
1857                         return NULL;
1858                 }
1859                 ds = be32_to_cpu(wsame->size);
1860         }
1861
1862         if (!expect(IS_ALIGNED(ds, 512)))
1863                 return NULL;
1864         if (trim || wsame) {
1865                 if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1866                         return NULL;
1867         } else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1868                 return NULL;
1869
1870         /* even though we trust out peer,
1871          * we sometimes have to double check. */
1872         if (sector + (ds>>9) > capacity) {
1873                 drbd_err(device, "request from peer beyond end of local disk: "
1874                         "capacity: %llus < sector: %llus + size: %u\n",
1875                         (unsigned long long)capacity,
1876                         (unsigned long long)sector, ds);
1877                 return NULL;
1878         }
1879
1880         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1881          * "criss-cross" setup, that might cause write-out on some other DRBD,
1882          * which in turn might block on the other node at this very place.  */
1883         peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1884         if (!peer_req)
1885                 return NULL;
1886
1887         peer_req->flags |= EE_WRITE;
1888         if (trim) {
1889                 peer_req->flags |= EE_IS_TRIM;
1890                 return peer_req;
1891         }
1892         if (wsame)
1893                 peer_req->flags |= EE_WRITE_SAME;
1894
1895         /* receive payload size bytes into page chain */
1896         ds = data_size;
1897         page = peer_req->pages;
1898         page_chain_for_each(page) {
1899                 unsigned len = min_t(int, ds, PAGE_SIZE);
1900                 data = kmap(page);
1901                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1902                 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1903                         drbd_err(device, "Fault injection: Corrupting data on receive\n");
1904                         data[0] = data[0] ^ (unsigned long)-1;
1905                 }
1906                 kunmap(page);
1907                 if (err) {
1908                         drbd_free_peer_req(device, peer_req);
1909                         return NULL;
1910                 }
1911                 ds -= len;
1912         }
1913
1914         if (digest_size) {
1915                 drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1916                 if (memcmp(dig_in, dig_vv, digest_size)) {
1917                         drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1918                                 (unsigned long long)sector, data_size);
1919                         drbd_free_peer_req(device, peer_req);
1920                         return NULL;
1921                 }
1922         }
1923         device->recv_cnt += data_size >> 9;
1924         return peer_req;
1925 }
1926
1927 /* drbd_drain_block() just takes a data block
1928  * out of the socket input buffer, and discards it.
1929  */
1930 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1931 {
1932         struct page *page;
1933         int err = 0;
1934         void *data;
1935
1936         if (!data_size)
1937                 return 0;
1938
1939         page = drbd_alloc_pages(peer_device, 1, 1);
1940
1941         data = kmap(page);
1942         while (data_size) {
1943                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1944
1945                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1946                 if (err)
1947                         break;
1948                 data_size -= len;
1949         }
1950         kunmap(page);
1951         drbd_free_pages(peer_device->device, page, 0);
1952         return err;
1953 }
1954
1955 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1956                            sector_t sector, int data_size)
1957 {
1958         struct bio_vec bvec;
1959         struct bvec_iter iter;
1960         struct bio *bio;
1961         int digest_size, err, expect;
1962         void *dig_in = peer_device->connection->int_dig_in;
1963         void *dig_vv = peer_device->connection->int_dig_vv;
1964
1965         digest_size = 0;
1966         if (peer_device->connection->peer_integrity_tfm) {
1967                 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1968                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1969                 if (err)
1970                         return err;
1971                 data_size -= digest_size;
1972         }
1973
1974         /* optimistically update recv_cnt.  if receiving fails below,
1975          * we disconnect anyways, and counters will be reset. */
1976         peer_device->device->recv_cnt += data_size>>9;
1977
1978         bio = req->master_bio;
1979         D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1980
1981         bio_for_each_segment(bvec, bio, iter) {
1982                 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1983                 expect = min_t(int, data_size, bvec.bv_len);
1984                 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1985                 kunmap(bvec.bv_page);
1986                 if (err)
1987                         return err;
1988                 data_size -= expect;
1989         }
1990
1991         if (digest_size) {
1992                 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1993                 if (memcmp(dig_in, dig_vv, digest_size)) {
1994                         drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1995                         return -EINVAL;
1996                 }
1997         }
1998
1999         D_ASSERT(peer_device->device, data_size == 0);
2000         return 0;
2001 }
2002
2003 /*
2004  * e_end_resync_block() is called in ack_sender context via
2005  * drbd_finish_peer_reqs().
2006  */
2007 static int e_end_resync_block(struct drbd_work *w, int unused)
2008 {
2009         struct drbd_peer_request *peer_req =
2010                 container_of(w, struct drbd_peer_request, w);
2011         struct drbd_peer_device *peer_device = peer_req->peer_device;
2012         struct drbd_device *device = peer_device->device;
2013         sector_t sector = peer_req->i.sector;
2014         int err;
2015
2016         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2017
2018         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2019                 drbd_set_in_sync(device, sector, peer_req->i.size);
2020                 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
2021         } else {
2022                 /* Record failure to sync */
2023                 drbd_rs_failed_io(device, sector, peer_req->i.size);
2024
2025                 err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2026         }
2027         dec_unacked(device);
2028
2029         return err;
2030 }
2031
2032 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
2033                             struct packet_info *pi) __releases(local)
2034 {
2035         struct drbd_device *device = peer_device->device;
2036         struct drbd_peer_request *peer_req;
2037
2038         peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
2039         if (!peer_req)
2040                 goto fail;
2041
2042         dec_rs_pending(device);
2043
2044         inc_unacked(device);
2045         /* corresponding dec_unacked() in e_end_resync_block()
2046          * respective _drbd_clear_done_ee */
2047
2048         peer_req->w.cb = e_end_resync_block;
2049         peer_req->submit_jif = jiffies;
2050
2051         spin_lock_irq(&device->resource->req_lock);
2052         list_add_tail(&peer_req->w.list, &device->sync_ee);
2053         spin_unlock_irq(&device->resource->req_lock);
2054
2055         atomic_add(pi->size >> 9, &device->rs_sect_ev);
2056         if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
2057                                      DRBD_FAULT_RS_WR) == 0)
2058                 return 0;
2059
2060         /* don't care for the reason here */
2061         drbd_err(device, "submit failed, triggering re-connect\n");
2062         spin_lock_irq(&device->resource->req_lock);
2063         list_del(&peer_req->w.list);
2064         spin_unlock_irq(&device->resource->req_lock);
2065
2066         drbd_free_peer_req(device, peer_req);
2067 fail:
2068         put_ldev(device);
2069         return -EIO;
2070 }
2071
2072 static struct drbd_request *
2073 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
2074              sector_t sector, bool missing_ok, const char *func)
2075 {
2076         struct drbd_request *req;
2077
2078         /* Request object according to our peer */
2079         req = (struct drbd_request *)(unsigned long)id;
2080         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
2081                 return req;
2082         if (!missing_ok) {
2083                 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
2084                         (unsigned long)id, (unsigned long long)sector);
2085         }
2086         return NULL;
2087 }
2088
2089 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
2090 {
2091         struct drbd_peer_device *peer_device;
2092         struct drbd_device *device;
2093         struct drbd_request *req;
2094         sector_t sector;
2095         int err;
2096         struct p_data *p = pi->data;
2097
2098         peer_device = conn_peer_device(connection, pi->vnr);
2099         if (!peer_device)
2100                 return -EIO;
2101         device = peer_device->device;
2102
2103         sector = be64_to_cpu(p->sector);
2104
2105         spin_lock_irq(&device->resource->req_lock);
2106         req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2107         spin_unlock_irq(&device->resource->req_lock);
2108         if (unlikely(!req))
2109                 return -EIO;
2110
2111         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2112          * special casing it there for the various failure cases.
2113          * still no race with drbd_fail_pending_reads */
2114         err = recv_dless_read(peer_device, req, sector, pi->size);
2115         if (!err)
2116                 req_mod(req, DATA_RECEIVED);
2117         /* else: nothing. handled from drbd_disconnect...
2118          * I don't think we may complete this just yet
2119          * in case we are "on-disconnect: freeze" */
2120
2121         return err;
2122 }
2123
2124 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2125 {
2126         struct drbd_peer_device *peer_device;
2127         struct drbd_device *device;
2128         sector_t sector;
2129         int err;
2130         struct p_data *p = pi->data;
2131
2132         peer_device = conn_peer_device(connection, pi->vnr);
2133         if (!peer_device)
2134                 return -EIO;
2135         device = peer_device->device;
2136
2137         sector = be64_to_cpu(p->sector);
2138         D_ASSERT(device, p->block_id == ID_SYNCER);
2139
2140         if (get_ldev(device)) {
2141                 /* data is submitted to disk within recv_resync_read.
2142                  * corresponding put_ldev done below on error,
2143                  * or in drbd_peer_request_endio. */
2144                 err = recv_resync_read(peer_device, sector, pi);
2145         } else {
2146                 if (__ratelimit(&drbd_ratelimit_state))
2147                         drbd_err(device, "Can not write resync data to local disk.\n");
2148
2149                 err = drbd_drain_block(peer_device, pi->size);
2150
2151                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2152         }
2153
2154         atomic_add(pi->size >> 9, &device->rs_sect_in);
2155
2156         return err;
2157 }
2158
2159 static void restart_conflicting_writes(struct drbd_device *device,
2160                                        sector_t sector, int size)
2161 {
2162         struct drbd_interval *i;
2163         struct drbd_request *req;
2164
2165         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2166                 if (!i->local)
2167                         continue;
2168                 req = container_of(i, struct drbd_request, i);
2169                 if (req->rq_state & RQ_LOCAL_PENDING ||
2170                     !(req->rq_state & RQ_POSTPONED))
2171                         continue;
2172                 /* as it is RQ_POSTPONED, this will cause it to
2173                  * be queued on the retry workqueue. */
2174                 __req_mod(req, CONFLICT_RESOLVED, NULL);
2175         }
2176 }
2177
2178 /*
2179  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2180  */
2181 static int e_end_block(struct drbd_work *w, int cancel)
2182 {
2183         struct drbd_peer_request *peer_req =
2184                 container_of(w, struct drbd_peer_request, w);
2185         struct drbd_peer_device *peer_device = peer_req->peer_device;
2186         struct drbd_device *device = peer_device->device;
2187         sector_t sector = peer_req->i.sector;
2188         int err = 0, pcmd;
2189
2190         if (peer_req->flags & EE_SEND_WRITE_ACK) {
2191                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2192                         pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2193                                 device->state.conn <= C_PAUSED_SYNC_T &&
2194                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2195                                 P_RS_WRITE_ACK : P_WRITE_ACK;
2196                         err = drbd_send_ack(peer_device, pcmd, peer_req);
2197                         if (pcmd == P_RS_WRITE_ACK)
2198                                 drbd_set_in_sync(device, sector, peer_req->i.size);
2199                 } else {
2200                         err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2201                         /* we expect it to be marked out of sync anyways...
2202                          * maybe assert this?  */
2203                 }
2204                 dec_unacked(device);
2205         }
2206
2207         /* we delete from the conflict detection hash _after_ we sent out the
2208          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2209         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2210                 spin_lock_irq(&device->resource->req_lock);
2211                 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2212                 drbd_remove_epoch_entry_interval(device, peer_req);
2213                 if (peer_req->flags & EE_RESTART_REQUESTS)
2214                         restart_conflicting_writes(device, sector, peer_req->i.size);
2215                 spin_unlock_irq(&device->resource->req_lock);
2216         } else
2217                 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2218
2219         drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2220
2221         return err;
2222 }
2223
2224 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2225 {
2226         struct drbd_peer_request *peer_req =
2227                 container_of(w, struct drbd_peer_request, w);
2228         struct drbd_peer_device *peer_device = peer_req->peer_device;
2229         int err;
2230
2231         err = drbd_send_ack(peer_device, ack, peer_req);
2232         dec_unacked(peer_device->device);
2233
2234         return err;
2235 }
2236
2237 static int e_send_superseded(struct drbd_work *w, int unused)
2238 {
2239         return e_send_ack(w, P_SUPERSEDED);
2240 }
2241
2242 static int e_send_retry_write(struct drbd_work *w, int unused)
2243 {
2244         struct drbd_peer_request *peer_req =
2245                 container_of(w, struct drbd_peer_request, w);
2246         struct drbd_connection *connection = peer_req->peer_device->connection;
2247
2248         return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2249                              P_RETRY_WRITE : P_SUPERSEDED);
2250 }
2251
2252 static bool seq_greater(u32 a, u32 b)
2253 {
2254         /*
2255          * We assume 32-bit wrap-around here.
2256          * For 24-bit wrap-around, we would have to shift:
2257          *  a <<= 8; b <<= 8;
2258          */
2259         return (s32)a - (s32)b > 0;
2260 }
2261
2262 static u32 seq_max(u32 a, u32 b)
2263 {
2264         return seq_greater(a, b) ? a : b;
2265 }
2266
2267 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2268 {
2269         struct drbd_device *device = peer_device->device;
2270         unsigned int newest_peer_seq;
2271
2272         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2273                 spin_lock(&device->peer_seq_lock);
2274                 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2275                 device->peer_seq = newest_peer_seq;
2276                 spin_unlock(&device->peer_seq_lock);
2277                 /* wake up only if we actually changed device->peer_seq */
2278                 if (peer_seq == newest_peer_seq)
2279                         wake_up(&device->seq_wait);
2280         }
2281 }
2282
2283 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2284 {
2285         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2286 }
2287
2288 /* maybe change sync_ee into interval trees as well? */
2289 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2290 {
2291         struct drbd_peer_request *rs_req;
2292         bool rv = false;
2293
2294         spin_lock_irq(&device->resource->req_lock);
2295         list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2296                 if (overlaps(peer_req->i.sector, peer_req->i.size,
2297                              rs_req->i.sector, rs_req->i.size)) {
2298                         rv = true;
2299                         break;
2300                 }
2301         }
2302         spin_unlock_irq(&device->resource->req_lock);
2303
2304         return rv;
2305 }
2306
2307 /* Called from receive_Data.
2308  * Synchronize packets on sock with packets on msock.
2309  *
2310  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2311  * packet traveling on msock, they are still processed in the order they have
2312  * been sent.
2313  *
2314  * Note: we don't care for Ack packets overtaking P_DATA packets.
2315  *
2316  * In case packet_seq is larger than device->peer_seq number, there are
2317  * outstanding packets on the msock. We wait for them to arrive.
2318  * In case we are the logically next packet, we update device->peer_seq
2319  * ourselves. Correctly handles 32bit wrap around.
2320  *
2321  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2322  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2323  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2324  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2325  *
2326  * returns 0 if we may process the packet,
2327  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2328 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2329 {
2330         struct drbd_device *device = peer_device->device;
2331         DEFINE_WAIT(wait);
2332         long timeout;
2333         int ret = 0, tp;
2334
2335         if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2336                 return 0;
2337
2338         spin_lock(&device->peer_seq_lock);
2339         for (;;) {
2340                 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2341                         device->peer_seq = seq_max(device->peer_seq, peer_seq);
2342                         break;
2343                 }
2344
2345                 if (signal_pending(current)) {
2346                         ret = -ERESTARTSYS;
2347                         break;
2348                 }
2349
2350                 rcu_read_lock();
2351                 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2352                 rcu_read_unlock();
2353
2354                 if (!tp)
2355                         break;
2356
2357                 /* Only need to wait if two_primaries is enabled */
2358                 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2359                 spin_unlock(&device->peer_seq_lock);
2360                 rcu_read_lock();
2361                 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2362                 rcu_read_unlock();
2363                 timeout = schedule_timeout(timeout);
2364                 spin_lock(&device->peer_seq_lock);
2365                 if (!timeout) {
2366                         ret = -ETIMEDOUT;
2367                         drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2368                         break;
2369                 }
2370         }
2371         spin_unlock(&device->peer_seq_lock);
2372         finish_wait(&device->seq_wait, &wait);
2373         return ret;
2374 }
2375
2376 /* see also bio_flags_to_wire()
2377  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2378  * flags and back. We may replicate to other kernel versions. */
2379 static unsigned long wire_flags_to_bio_flags(u32 dpf)
2380 {
2381         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2382                 (dpf & DP_FUA ? REQ_FUA : 0) |
2383                 (dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2384 }
2385
2386 static unsigned long wire_flags_to_bio_op(u32 dpf)
2387 {
2388         if (dpf & DP_DISCARD)
2389                 return REQ_OP_DISCARD;
2390         else
2391                 return REQ_OP_WRITE;
2392 }
2393
2394 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2395                                     unsigned int size)
2396 {
2397         struct drbd_interval *i;
2398
2399     repeat:
2400         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2401                 struct drbd_request *req;
2402                 struct bio_and_error m;
2403
2404                 if (!i->local)
2405                         continue;
2406                 req = container_of(i, struct drbd_request, i);
2407                 if (!(req->rq_state & RQ_POSTPONED))
2408                         continue;
2409                 req->rq_state &= ~RQ_POSTPONED;
2410                 __req_mod(req, NEG_ACKED, &m);
2411                 spin_unlock_irq(&device->resource->req_lock);
2412                 if (m.bio)
2413                         complete_master_bio(device, &m);
2414                 spin_lock_irq(&device->resource->req_lock);
2415                 goto repeat;
2416         }
2417 }
2418
2419 static int handle_write_conflicts(struct drbd_device *device,
2420                                   struct drbd_peer_request *peer_req)
2421 {
2422         struct drbd_connection *connection = peer_req->peer_device->connection;
2423         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2424         sector_t sector = peer_req->i.sector;
2425         const unsigned int size = peer_req->i.size;
2426         struct drbd_interval *i;
2427         bool equal;
2428         int err;
2429
2430         /*
2431          * Inserting the peer request into the write_requests tree will prevent
2432          * new conflicting local requests from being added.
2433          */
2434         drbd_insert_interval(&device->write_requests, &peer_req->i);
2435
2436     repeat:
2437         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2438                 if (i == &peer_req->i)
2439                         continue;
2440                 if (i->completed)
2441                         continue;
2442
2443                 if (!i->local) {
2444                         /*
2445                          * Our peer has sent a conflicting remote request; this
2446                          * should not happen in a two-node setup.  Wait for the
2447                          * earlier peer request to complete.
2448                          */
2449                         err = drbd_wait_misc(device, i);
2450                         if (err)
2451                                 goto out;
2452                         goto repeat;
2453                 }
2454
2455                 equal = i->sector == sector && i->size == size;
2456                 if (resolve_conflicts) {
2457                         /*
2458                          * If the peer request is fully contained within the
2459                          * overlapping request, it can be considered overwritten
2460                          * and thus superseded; otherwise, it will be retried
2461                          * once all overlapping requests have completed.
2462                          */
2463                         bool superseded = i->sector <= sector && i->sector +
2464                                        (i->size >> 9) >= sector + (size >> 9);
2465
2466                         if (!equal)
2467                                 drbd_alert(device, "Concurrent writes detected: "
2468                                                "local=%llus +%u, remote=%llus +%u, "
2469                                                "assuming %s came first\n",
2470                                           (unsigned long long)i->sector, i->size,
2471                                           (unsigned long long)sector, size,
2472                                           superseded ? "local" : "remote");
2473
2474                         peer_req->w.cb = superseded ? e_send_superseded :
2475                                                    e_send_retry_write;
2476                         list_add_tail(&peer_req->w.list, &device->done_ee);
2477                         queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2478
2479                         err = -ENOENT;
2480                         goto out;
2481                 } else {
2482                         struct drbd_request *req =
2483                                 container_of(i, struct drbd_request, i);
2484
2485                         if (!equal)
2486                                 drbd_alert(device, "Concurrent writes detected: "
2487                                                "local=%llus +%u, remote=%llus +%u\n",
2488                                           (unsigned long long)i->sector, i->size,
2489                                           (unsigned long long)sector, size);
2490
2491                         if (req->rq_state & RQ_LOCAL_PENDING ||
2492                             !(req->rq_state & RQ_POSTPONED)) {
2493                                 /*
2494                                  * Wait for the node with the discard flag to
2495                                  * decide if this request has been superseded
2496                                  * or needs to be retried.
2497                                  * Requests that have been superseded will
2498                                  * disappear from the write_requests tree.
2499                                  *
2500                                  * In addition, wait for the conflicting
2501                                  * request to finish locally before submitting
2502                                  * the conflicting peer request.
2503                                  */
2504                                 err = drbd_wait_misc(device, &req->i);
2505                                 if (err) {
2506                                         _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2507                                         fail_postponed_requests(device, sector, size);
2508                                         goto out;
2509                                 }
2510                                 goto repeat;
2511                         }
2512                         /*
2513                          * Remember to restart the conflicting requests after
2514                          * the new peer request has completed.
2515                          */
2516                         peer_req->flags |= EE_RESTART_REQUESTS;
2517                 }
2518         }
2519         err = 0;
2520
2521     out:
2522         if (err)
2523                 drbd_remove_epoch_entry_interval(device, peer_req);
2524         return err;
2525 }
2526
2527 /* mirrored write */
2528 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2529 {
2530         struct drbd_peer_device *peer_device;
2531         struct drbd_device *device;
2532         struct net_conf *nc;
2533         sector_t sector;
2534         struct drbd_peer_request *peer_req;
2535         struct p_data *p = pi->data;
2536         u32 peer_seq = be32_to_cpu(p->seq_num);
2537         int op, op_flags;
2538         u32 dp_flags;
2539         int err, tp;
2540
2541         peer_device = conn_peer_device(connection, pi->vnr);
2542         if (!peer_device)
2543                 return -EIO;
2544         device = peer_device->device;
2545
2546         if (!get_ldev(device)) {
2547                 int err2;
2548
2549                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2550                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2551                 atomic_inc(&connection->current_epoch->epoch_size);
2552                 err2 = drbd_drain_block(peer_device, pi->size);
2553                 if (!err)
2554                         err = err2;
2555                 return err;
2556         }
2557
2558         /*
2559          * Corresponding put_ldev done either below (on various errors), or in
2560          * drbd_peer_request_endio, if we successfully submit the data at the
2561          * end of this function.
2562          */
2563
2564         sector = be64_to_cpu(p->sector);
2565         peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2566         if (!peer_req) {
2567                 put_ldev(device);
2568                 return -EIO;
2569         }
2570
2571         peer_req->w.cb = e_end_block;
2572         peer_req->submit_jif = jiffies;
2573         peer_req->flags |= EE_APPLICATION;
2574
2575         dp_flags = be32_to_cpu(p->dp_flags);
2576         op = wire_flags_to_bio_op(dp_flags);
2577         op_flags = wire_flags_to_bio_flags(dp_flags);
2578         if (pi->cmd == P_TRIM) {
2579                 D_ASSERT(peer_device, peer_req->i.size > 0);
2580                 D_ASSERT(peer_device, op == REQ_OP_DISCARD);
2581                 D_ASSERT(peer_device, peer_req->pages == NULL);
2582         } else if (peer_req->pages == NULL) {
2583                 D_ASSERT(device, peer_req->i.size == 0);
2584                 D_ASSERT(device, dp_flags & DP_FLUSH);
2585         }
2586
2587         if (dp_flags & DP_MAY_SET_IN_SYNC)
2588                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2589
2590         spin_lock(&connection->epoch_lock);
2591         peer_req->epoch = connection->current_epoch;
2592         atomic_inc(&peer_req->epoch->epoch_size);
2593         atomic_inc(&peer_req->epoch->active);
2594         spin_unlock(&connection->epoch_lock);
2595
2596         rcu_read_lock();
2597         nc = rcu_dereference(peer_device->connection->net_conf);
2598         tp = nc->two_primaries;
2599         if (peer_device->connection->agreed_pro_version < 100) {
2600                 switch (nc->wire_protocol) {
2601                 case DRBD_PROT_C:
2602                         dp_flags |= DP_SEND_WRITE_ACK;
2603                         break;
2604                 case DRBD_PROT_B:
2605                         dp_flags |= DP_SEND_RECEIVE_ACK;
2606                         break;
2607                 }
2608         }
2609         rcu_read_unlock();
2610
2611         if (dp_flags & DP_SEND_WRITE_ACK) {
2612                 peer_req->flags |= EE_SEND_WRITE_ACK;
2613                 inc_unacked(device);
2614                 /* corresponding dec_unacked() in e_end_block()
2615                  * respective _drbd_clear_done_ee */
2616         }
2617
2618         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2619                 /* I really don't like it that the receiver thread
2620                  * sends on the msock, but anyways */
2621                 drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2622         }
2623
2624         if (tp) {
2625                 /* two primaries implies protocol C */
2626                 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2627                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2628                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2629                 if (err)
2630                         goto out_interrupted;
2631                 spin_lock_irq(&device->resource->req_lock);
2632                 err = handle_write_conflicts(device, peer_req);
2633                 if (err) {
2634                         spin_unlock_irq(&device->resource->req_lock);
2635                         if (err == -ENOENT) {
2636                                 put_ldev(device);
2637                                 return 0;
2638                         }
2639                         goto out_interrupted;
2640                 }
2641         } else {
2642                 update_peer_seq(peer_device, peer_seq);
2643                 spin_lock_irq(&device->resource->req_lock);
2644         }
2645         /* TRIM and WRITE_SAME are processed synchronously,
2646          * we wait for all pending requests, respectively wait for
2647          * active_ee to become empty in drbd_submit_peer_request();
2648          * better not add ourselves here. */
2649         if ((peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) == 0)
2650                 list_add_tail(&peer_req->w.list, &device->active_ee);
2651         spin_unlock_irq(&device->resource->req_lock);
2652
2653         if (device->state.conn == C_SYNC_TARGET)
2654                 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2655
2656         if (device->state.pdsk < D_INCONSISTENT) {
2657                 /* In case we have the only disk of the cluster, */
2658                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2659                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2660                 drbd_al_begin_io(device, &peer_req->i);
2661                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2662         }
2663
2664         err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2665                                        DRBD_FAULT_DT_WR);
2666         if (!err)
2667                 return 0;
2668
2669         /* don't care for the reason here */
2670         drbd_err(device, "submit failed, triggering re-connect\n");
2671         spin_lock_irq(&device->resource->req_lock);
2672         list_del(&peer_req->w.list);
2673         drbd_remove_epoch_entry_interval(device, peer_req);
2674         spin_unlock_irq(&device->resource->req_lock);
2675         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2676                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2677                 drbd_al_complete_io(device, &peer_req->i);
2678         }
2679
2680 out_interrupted:
2681         drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2682         put_ldev(device);
2683         drbd_free_peer_req(device, peer_req);
2684         return err;
2685 }
2686
2687 /* We may throttle resync, if the lower device seems to be busy,
2688  * and current sync rate is above c_min_rate.
2689  *
2690  * To decide whether or not the lower device is busy, we use a scheme similar
2691  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2692  * (more than 64 sectors) of activity we cannot account for with our own resync
2693  * activity, it obviously is "busy".
2694  *
2695  * The current sync rate used here uses only the most recent two step marks,
2696  * to have a short time average so we can react faster.
2697  */
2698 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2699                 bool throttle_if_app_is_waiting)
2700 {
2701         struct lc_element *tmp;
2702         bool throttle = drbd_rs_c_min_rate_throttle(device);
2703
2704         if (!throttle || throttle_if_app_is_waiting)
2705                 return throttle;
2706
2707         spin_lock_irq(&device->al_lock);
2708         tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2709         if (tmp) {
2710                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2711                 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2712                         throttle = false;
2713                 /* Do not slow down if app IO is already waiting for this extent,
2714                  * and our progress is necessary for application IO to complete. */
2715         }
2716         spin_unlock_irq(&device->al_lock);
2717
2718         return throttle;
2719 }
2720
2721 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2722 {
2723         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2724         unsigned long db, dt, dbdt;
2725         unsigned int c_min_rate;
2726         int curr_events;
2727
2728         rcu_read_lock();
2729         c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2730         rcu_read_unlock();
2731
2732         /* feature disabled? */
2733         if (c_min_rate == 0)
2734                 return false;
2735
2736         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2737                       (int)part_stat_read(&disk->part0, sectors[1]) -
2738                         atomic_read(&device->rs_sect_ev);
2739
2740         if (atomic_read(&device->ap_actlog_cnt)
2741             || curr_events - device->rs_last_events > 64) {
2742                 unsigned long rs_left;
2743                 int i;
2744
2745                 device->rs_last_events = curr_events;
2746
2747                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2748                  * approx. */
2749                 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2750
2751                 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2752                         rs_left = device->ov_left;
2753                 else
2754                         rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2755
2756                 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2757                 if (!dt)
2758                         dt++;
2759                 db = device->rs_mark_left[i] - rs_left;
2760                 dbdt = Bit2KB(db/dt);
2761
2762                 if (dbdt > c_min_rate)
2763                         return true;
2764         }
2765         return false;
2766 }
2767
2768 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2769 {
2770         struct drbd_peer_device *peer_device;
2771         struct drbd_device *device;
2772         sector_t sector;
2773         sector_t capacity;
2774         struct drbd_peer_request *peer_req;
2775         struct digest_info *di = NULL;
2776         int size, verb;
2777         unsigned int fault_type;
2778         struct p_block_req *p = pi->data;
2779
2780         peer_device = conn_peer_device(connection, pi->vnr);
2781         if (!peer_device)
2782                 return -EIO;
2783         device = peer_device->device;
2784         capacity = drbd_get_capacity(device->this_bdev);
2785
2786         sector = be64_to_cpu(p->sector);
2787         size   = be32_to_cpu(p->blksize);
2788
2789         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2790                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2791                                 (unsigned long long)sector, size);
2792                 return -EINVAL;
2793         }
2794         if (sector + (size>>9) > capacity) {
2795                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2796                                 (unsigned long long)sector, size);
2797                 return -EINVAL;
2798         }
2799
2800         if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2801                 verb = 1;
2802                 switch (pi->cmd) {
2803                 case P_DATA_REQUEST:
2804                         drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2805                         break;
2806                 case P_RS_THIN_REQ:
2807                 case P_RS_DATA_REQUEST:
2808                 case P_CSUM_RS_REQUEST:
2809                 case P_OV_REQUEST:
2810                         drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2811                         break;
2812                 case P_OV_REPLY:
2813                         verb = 0;
2814                         dec_rs_pending(device);
2815                         drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2816                         break;
2817                 default:
2818                         BUG();
2819                 }
2820                 if (verb && __ratelimit(&drbd_ratelimit_state))
2821                         drbd_err(device, "Can not satisfy peer's read request, "
2822                             "no local data.\n");
2823
2824                 /* drain possibly payload */
2825                 return drbd_drain_block(peer_device, pi->size);
2826         }
2827
2828         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2829          * "criss-cross" setup, that might cause write-out on some other DRBD,
2830          * which in turn might block on the other node at this very place.  */
2831         peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2832                         size, GFP_NOIO);
2833         if (!peer_req) {
2834                 put_ldev(device);
2835                 return -ENOMEM;
2836         }
2837
2838         switch (pi->cmd) {
2839         case P_DATA_REQUEST:
2840                 peer_req->w.cb = w_e_end_data_req;
2841                 fault_type = DRBD_FAULT_DT_RD;
2842                 /* application IO, don't drbd_rs_begin_io */
2843                 peer_req->flags |= EE_APPLICATION;
2844                 goto submit;
2845
2846         case P_RS_THIN_REQ:
2847                 /* If at some point in the future we have a smart way to
2848                    find out if this data block is completely deallocated,
2849                    then we would do something smarter here than reading
2850                    the block... */
2851                 peer_req->flags |= EE_RS_THIN_REQ;
2852         case P_RS_DATA_REQUEST:
2853                 peer_req->w.cb = w_e_end_rsdata_req;
2854                 fault_type = DRBD_FAULT_RS_RD;
2855                 /* used in the sector offset progress display */
2856                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2857                 break;
2858
2859         case P_OV_REPLY:
2860         case P_CSUM_RS_REQUEST:
2861                 fault_type = DRBD_FAULT_RS_RD;
2862                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2863                 if (!di)
2864                         goto out_free_e;
2865
2866                 di->digest_size = pi->size;
2867                 di->digest = (((char *)di)+sizeof(struct digest_info));
2868
2869                 peer_req->digest = di;
2870                 peer_req->flags |= EE_HAS_DIGEST;
2871
2872                 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2873                         goto out_free_e;
2874
2875                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2876                         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2877                         peer_req->w.cb = w_e_end_csum_rs_req;
2878                         /* used in the sector offset progress display */
2879                         device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2880                         /* remember to report stats in drbd_resync_finished */
2881                         device->use_csums = true;
2882                 } else if (pi->cmd == P_OV_REPLY) {
2883                         /* track progress, we may need to throttle */
2884                         atomic_add(size >> 9, &device->rs_sect_in);
2885                         peer_req->w.cb = w_e_end_ov_reply;
2886                         dec_rs_pending(device);
2887                         /* drbd_rs_begin_io done when we sent this request,
2888                          * but accounting still needs to be done. */
2889                         goto submit_for_resync;
2890                 }
2891                 break;
2892
2893         case P_OV_REQUEST:
2894                 if (device->ov_start_sector == ~(sector_t)0 &&
2895                     peer_device->connection->agreed_pro_version >= 90) {
2896                         unsigned long now = jiffies;
2897                         int i;
2898                         device->ov_start_sector = sector;
2899                         device->ov_position = sector;
2900                         device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2901                         device->rs_total = device->ov_left;
2902                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2903                                 device->rs_mark_left[i] = device->ov_left;
2904                                 device->rs_mark_time[i] = now;
2905                         }
2906                         drbd_info(device, "Online Verify start sector: %llu\n",
2907                                         (unsigned long long)sector);
2908                 }
2909                 peer_req->w.cb = w_e_end_ov_req;
2910                 fault_type = DRBD_FAULT_RS_RD;
2911                 break;
2912
2913         default:
2914                 BUG();
2915         }
2916
2917         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2918          * wrt the receiver, but it is not as straightforward as it may seem.
2919          * Various places in the resync start and stop logic assume resync
2920          * requests are processed in order, requeuing this on the worker thread
2921          * introduces a bunch of new code for synchronization between threads.
2922          *
2923          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2924          * "forever", throttling after drbd_rs_begin_io will lock that extent
2925          * for application writes for the same time.  For now, just throttle
2926          * here, where the rest of the code expects the receiver to sleep for
2927          * a while, anyways.
2928          */
2929
2930         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2931          * this defers syncer requests for some time, before letting at least
2932          * on request through.  The resync controller on the receiving side
2933          * will adapt to the incoming rate accordingly.
2934          *
2935          * We cannot throttle here if remote is Primary/SyncTarget:
2936          * we would also throttle its application reads.
2937          * In that case, throttling is done on the SyncTarget only.
2938          */
2939
2940         /* Even though this may be a resync request, we do add to "read_ee";
2941          * "sync_ee" is only used for resync WRITEs.
2942          * Add to list early, so debugfs can find this request
2943          * even if we have to sleep below. */
2944         spin_lock_irq(&device->resource->req_lock);
2945         list_add_tail(&peer_req->w.list, &device->read_ee);
2946         spin_unlock_irq(&device->resource->req_lock);
2947
2948         update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2949         if (device->state.peer != R_PRIMARY
2950         && drbd_rs_should_slow_down(device, sector, false))
2951                 schedule_timeout_uninterruptible(HZ/10);
2952         update_receiver_timing_details(connection, drbd_rs_begin_io);
2953         if (drbd_rs_begin_io(device, sector))
2954                 goto out_free_e;
2955
2956 submit_for_resync:
2957         atomic_add(size >> 9, &device->rs_sect_ev);
2958
2959 submit:
2960         update_receiver_timing_details(connection, drbd_submit_peer_request);
2961         inc_unacked(device);
2962         if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
2963                                      fault_type) == 0)
2964                 return 0;
2965
2966         /* don't care for the reason here */
2967         drbd_err(device, "submit failed, triggering re-connect\n");
2968
2969 out_free_e:
2970         spin_lock_irq(&device->resource->req_lock);
2971         list_del(&peer_req->w.list);
2972         spin_unlock_irq(&device->resource->req_lock);
2973         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2974
2975         put_ldev(device);
2976         drbd_free_peer_req(device, peer_req);
2977         return -EIO;
2978 }
2979
2980 /**
2981  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2982  */
2983 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2984 {
2985         struct drbd_device *device = peer_device->device;
2986         int self, peer, rv = -100;
2987         unsigned long ch_self, ch_peer;
2988         enum drbd_after_sb_p after_sb_0p;
2989
2990         self = device->ldev->md.uuid[UI_BITMAP] & 1;
2991         peer = device->p_uuid[UI_BITMAP] & 1;
2992
2993         ch_peer = device->p_uuid[UI_SIZE];
2994         ch_self = device->comm_bm_set;
2995
2996         rcu_read_lock();
2997         after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2998         rcu_read_unlock();
2999         switch (after_sb_0p) {
3000         case ASB_CONSENSUS:
3001         case ASB_DISCARD_SECONDARY:
3002         case ASB_CALL_HELPER:
3003         case ASB_VIOLENTLY:
3004                 drbd_err(device, "Configuration error.\n");
3005                 break;
3006         case ASB_DISCONNECT:
3007                 break;
3008         case ASB_DISCARD_YOUNGER_PRI:
3009                 if (self == 0 && peer == 1) {
3010                         rv = -1;
3011                         break;
3012                 }
3013                 if (self == 1 && peer == 0) {
3014                         rv =  1;
3015                         break;
3016                 }
3017                 /* Else fall through to one of the other strategies... */
3018         case ASB_DISCARD_OLDER_PRI:
3019                 if (self == 0 && peer == 1) {
3020                         rv = 1;
3021                         break;
3022                 }
3023                 if (self == 1 && peer == 0) {
3024                         rv = -1;
3025                         break;
3026                 }
3027                 /* Else fall through to one of the other strategies... */
3028                 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
3029                      "Using discard-least-changes instead\n");
3030         case ASB_DISCARD_ZERO_CHG:
3031                 if (ch_peer == 0 && ch_self == 0) {
3032                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3033                                 ? -1 : 1;
3034                         break;
3035                 } else {
3036                         if (ch_peer == 0) { rv =  1; break; }
3037                         if (ch_self == 0) { rv = -1; break; }
3038                 }
3039                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
3040                         break;
3041         case ASB_DISCARD_LEAST_CHG:
3042                 if      (ch_self < ch_peer)
3043                         rv = -1;
3044                 else if (ch_self > ch_peer)
3045                         rv =  1;
3046                 else /* ( ch_self == ch_peer ) */
3047                      /* Well, then use something else. */
3048                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3049                                 ? -1 : 1;
3050                 break;
3051         case ASB_DISCARD_LOCAL:
3052                 rv = -1;
3053                 break;
3054         case ASB_DISCARD_REMOTE:
3055                 rv =  1;
3056         }
3057
3058         return rv;
3059 }
3060
3061 /**
3062  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
3063  */
3064 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
3065 {
3066         struct drbd_device *device = peer_device->device;
3067         int hg, rv = -100;
3068         enum drbd_after_sb_p after_sb_1p;
3069
3070         rcu_read_lock();
3071         after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
3072         rcu_read_unlock();
3073         switch (after_sb_1p) {
3074         case ASB_DISCARD_YOUNGER_PRI:
3075         case ASB_DISCARD_OLDER_PRI:
3076         case ASB_DISCARD_LEAST_CHG:
3077         case ASB_DISCARD_LOCAL:
3078         case ASB_DISCARD_REMOTE:
3079         case ASB_DISCARD_ZERO_CHG:
3080                 drbd_err(device, "Configuration error.\n");
3081                 break;
3082         case ASB_DISCONNECT:
3083                 break;
3084         case ASB_CONSENSUS:
3085                 hg = drbd_asb_recover_0p(peer_device);
3086                 if (hg == -1 && device->state.role == R_SECONDARY)
3087                         rv = hg;
3088                 if (hg == 1  && device->state.role == R_PRIMARY)
3089                         rv = hg;
3090                 break;
3091         case ASB_VIOLENTLY:
3092                 rv = drbd_asb_recover_0p(peer_device);
3093                 break;
3094         case ASB_DISCARD_SECONDARY:
3095                 return device->state.role == R_PRIMARY ? 1 : -1;
3096         case ASB_CALL_HELPER:
3097                 hg = drbd_asb_recover_0p(peer_device);
3098                 if (hg == -1 && device->state.role == R_PRIMARY) {
3099                         enum drbd_state_rv rv2;
3100
3101                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3102                           * we might be here in C_WF_REPORT_PARAMS which is transient.
3103                           * we do not need to wait for the after state change work either. */
3104                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3105                         if (rv2 != SS_SUCCESS) {
3106                                 drbd_khelper(device, "pri-lost-after-sb");
3107                         } else {
3108                                 drbd_warn(device, "Successfully gave up primary role.\n");
3109                                 rv = hg;
3110                         }
3111                 } else
3112                         rv = hg;
3113         }
3114
3115         return rv;
3116 }
3117
3118 /**
3119  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
3120  */
3121 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3122 {
3123         struct drbd_device *device = peer_device->device;
3124         int hg, rv = -100;
3125         enum drbd_after_sb_p after_sb_2p;
3126
3127         rcu_read_lock();
3128         after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3129         rcu_read_unlock();
3130         switch (after_sb_2p) {
3131         case ASB_DISCARD_YOUNGER_PRI:
3132         case ASB_DISCARD_OLDER_PRI:
3133         case ASB_DISCARD_LEAST_CHG:
3134         case ASB_DISCARD_LOCAL:
3135         case ASB_DISCARD_REMOTE:
3136         case ASB_CONSENSUS:
3137         case ASB_DISCARD_SECONDARY:
3138         case ASB_DISCARD_ZERO_CHG:
3139                 drbd_err(device, "Configuration error.\n");
3140                 break;
3141         case ASB_VIOLENTLY:
3142                 rv = drbd_asb_recover_0p(peer_device);
3143                 break;
3144         case ASB_DISCONNECT:
3145                 break;
3146         case ASB_CALL_HELPER:
3147                 hg = drbd_asb_recover_0p(peer_device);
3148                 if (hg == -1) {
3149                         enum drbd_state_rv rv2;
3150
3151                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3152                           * we might be here in C_WF_REPORT_PARAMS which is transient.
3153                           * we do not need to wait for the after state change work either. */
3154                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3155                         if (rv2 != SS_SUCCESS) {
3156                                 drbd_khelper(device, "pri-lost-after-sb");
3157                         } else {
3158                                 drbd_warn(device, "Successfully gave up primary role.\n");
3159                                 rv = hg;
3160                         }
3161                 } else
3162                         rv = hg;
3163         }
3164
3165         return rv;
3166 }
3167
3168 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3169                            u64 bits, u64 flags)
3170 {
3171         if (!uuid) {
3172                 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3173                 return;
3174         }
3175         drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3176              text,
3177              (unsigned long long)uuid[UI_CURRENT],
3178              (unsigned long long)uuid[UI_BITMAP],
3179              (unsigned long long)uuid[UI_HISTORY_START],
3180              (unsigned long long)uuid[UI_HISTORY_END],
3181              (unsigned long long)bits,
3182              (unsigned long long)flags);
3183 }
3184
3185 /*
3186   100   after split brain try auto recover
3187     2   C_SYNC_SOURCE set BitMap
3188     1   C_SYNC_SOURCE use BitMap
3189     0   no Sync
3190    -1   C_SYNC_TARGET use BitMap
3191    -2   C_SYNC_TARGET set BitMap
3192  -100   after split brain, disconnect
3193 -1000   unrelated data
3194 -1091   requires proto 91
3195 -1096   requires proto 96
3196  */
3197
3198 static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3199 {
3200         struct drbd_peer_device *const peer_device = first_peer_device(device);
3201         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
3202         u64 self, peer;
3203         int i, j;
3204
3205         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3206         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3207
3208         *rule_nr = 10;
3209         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3210                 return 0;
3211
3212         *rule_nr = 20;
3213         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3214              peer != UUID_JUST_CREATED)
3215                 return -2;
3216
3217         *rule_nr = 30;
3218         if (self != UUID_JUST_CREATED &&
3219             (peer == UUID_JUST_CREATED || peer == (u64)0))
3220                 return 2;
3221
3222         if (self == peer) {
3223                 int rct, dc; /* roles at crash time */
3224
3225                 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3226
3227                         if (connection->agreed_pro_version < 91)
3228                                 return -1091;
3229
3230                         if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3231                             (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3232                                 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3233                                 drbd_uuid_move_history(device);
3234                                 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3235                                 device->ldev->md.uuid[UI_BITMAP] = 0;
3236
3237                                 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3238                                                device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3239                                 *rule_nr = 34;
3240                         } else {
3241                                 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3242                                 *rule_nr = 36;
3243                         }
3244
3245                         return 1;
3246                 }
3247
3248                 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3249
3250                         if (connection->agreed_pro_version < 91)
3251                                 return -1091;
3252
3253                         if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3254                             (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3255                                 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3256
3257                                 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3258                                 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3259                                 device->p_uuid[UI_BITMAP] = 0UL;
3260
3261                                 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3262                                 *rule_nr = 35;
3263                         } else {
3264                                 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3265                                 *rule_nr = 37;
3266                         }
3267
3268                         return -1;
3269                 }
3270
3271                 /* Common power [off|failure] */
3272                 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3273                         (device->p_uuid[UI_FLAGS] & 2);
3274                 /* lowest bit is set when we were primary,
3275                  * next bit (weight 2) is set when peer was primary */
3276                 *rule_nr = 40;
3277
3278                 /* Neither has the "crashed primary" flag set,
3279                  * only a replication link hickup. */
3280                 if (rct == 0)
3281                         return 0;
3282
3283                 /* Current UUID equal and no bitmap uuid; does not necessarily
3284                  * mean this was a "simultaneous hard crash", maybe IO was
3285                  * frozen, so no UUID-bump happened.
3286                  * This is a protocol change, overload DRBD_FF_WSAME as flag
3287                  * for "new-enough" peer DRBD version. */
3288                 if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3289                         *rule_nr = 41;
3290                         if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3291                                 drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3292                                 return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3293                         }
3294                         if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3295                                 /* At least one has the "crashed primary" bit set,
3296                                  * both are primary now, but neither has rotated its UUIDs?
3297                                  * "Can not happen." */
3298                                 drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3299                                 return -100;
3300                         }
3301                         if (device->state.role == R_PRIMARY)
3302                                 return 1;
3303                         return -1;
3304                 }
3305
3306                 /* Both are secondary.
3307                  * Really looks like recovery from simultaneous hard crash.
3308                  * Check which had been primary before, and arbitrate. */
3309                 switch (rct) {
3310                 case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3311                 case 1: /*  self_pri && !peer_pri */ return 1;
3312                 case 2: /* !self_pri &&  peer_pri */ return -1;
3313                 case 3: /*  self_pri &&  peer_pri */
3314                         dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3315                         return dc ? -1 : 1;
3316                 }
3317         }
3318
3319         *rule_nr = 50;
3320         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3321         if (self == peer)
3322                 return -1;
3323
3324         *rule_nr = 51;
3325         peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3326         if (self == peer) {
3327                 if (connection->agreed_pro_version < 96 ?
3328                     (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3329                     (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3330                     peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3331                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3332                            resync as sync source modifications of the peer's UUIDs. */
3333
3334                         if (connection->agreed_pro_version < 91)
3335                                 return -1091;
3336
3337                         device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3338                         device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3339
3340                         drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3341                         drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3342
3343                         return -1;
3344                 }
3345         }
3346
3347         *rule_nr = 60;
3348         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3349         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3350                 peer = device->p_uuid[i] & ~((u64)1);
3351                 if (self == peer)
3352                         return -2;
3353         }
3354
3355         *rule_nr = 70;
3356         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3357         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3358         if (self == peer)
3359                 return 1;
3360
3361         *rule_nr = 71;
3362         self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3363         if (self == peer) {
3364                 if (connection->agreed_pro_version < 96 ?
3365                     (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3366                     (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3367                     self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3368                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3369                            resync as sync source modifications of our UUIDs. */
3370
3371                         if (connection->agreed_pro_version < 91)
3372                                 return -1091;
3373
3374                         __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3375                         __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3376
3377                         drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3378                         drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3379                                        device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3380
3381                         return 1;
3382                 }
3383         }
3384
3385
3386         *rule_nr = 80;
3387         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3388         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3389                 self = device->ldev->md.uuid[i] & ~((u64)1);
3390                 if (self == peer)
3391                         return 2;
3392         }
3393
3394         *rule_nr = 90;
3395         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3396         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3397         if (self == peer && self != ((u64)0))
3398                 return 100;
3399
3400         *rule_nr = 100;
3401         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3402                 self = device->ldev->md.uuid[i] & ~((u64)1);
3403                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3404                         peer = device->p_uuid[j] & ~((u64)1);
3405                         if (self == peer)
3406                                 return -100;
3407                 }
3408         }
3409
3410         return -1000;
3411 }
3412
3413 /* drbd_sync_handshake() returns the new conn state on success, or
3414    CONN_MASK (-1) on failure.
3415  */
3416 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3417                                            enum drbd_role peer_role,
3418                                            enum drbd_disk_state peer_disk) __must_hold(local)
3419 {
3420         struct drbd_device *device = peer_device->device;
3421         enum drbd_conns rv = C_MASK;
3422         enum drbd_disk_state mydisk;
3423         struct net_conf *nc;
3424         int hg, rule_nr, rr_conflict, tentative, always_asbp;
3425
3426         mydisk = device->state.disk;
3427         if (mydisk == D_NEGOTIATING)
3428                 mydisk = device->new_state_tmp.disk;
3429
3430         drbd_info(device, "drbd_sync_handshake:\n");
3431
3432         spin_lock_irq(&device->ldev->md.uuid_lock);
3433         drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3434         drbd_uuid_dump(device, "peer", device->p_uuid,
3435                        device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3436
3437         hg = drbd_uuid_compare(device, peer_role, &rule_nr);
3438         spin_unlock_irq(&device->ldev->md.uuid_lock);
3439
3440         drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3441
3442         if (hg == -1000) {
3443                 drbd_alert(device, "Unrelated data, aborting!\n");
3444                 return C_MASK;
3445         }
3446         if (hg < -0x10000) {
3447                 int proto, fflags;
3448                 hg = -hg;
3449                 proto = hg & 0xff;
3450                 fflags = (hg >> 8) & 0xff;
3451                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3452                                         proto, fflags);
3453                 return C_MASK;
3454         }
3455         if (hg < -1000) {
3456                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3457                 return C_MASK;
3458         }
3459
3460         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3461             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3462                 int f = (hg == -100) || abs(hg) == 2;
3463                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3464                 if (f)
3465                         hg = hg*2;
3466                 drbd_info(device, "Becoming sync %s due to disk states.\n",
3467                      hg > 0 ? "source" : "target");
3468         }
3469
3470         if (abs(hg) == 100)
3471                 drbd_khelper(device, "initial-split-brain");
3472
3473         rcu_read_lock();
3474         nc = rcu_dereference(peer_device->connection->net_conf);
3475         always_asbp = nc->always_asbp;
3476         rr_conflict = nc->rr_conflict;
3477         tentative = nc->tentative;
3478         rcu_read_unlock();
3479
3480         if (hg == 100 || (hg == -100 && always_asbp)) {
3481                 int pcount = (device->state.role == R_PRIMARY)
3482                            + (peer_role == R_PRIMARY);
3483                 int forced = (hg == -100);
3484
3485                 switch (pcount) {
3486                 case 0:
3487                         hg = drbd_asb_recover_0p(peer_device);
3488                         break;
3489                 case 1:
3490                         hg = drbd_asb_recover_1p(peer_device);
3491                         break;
3492                 case 2:
3493                         hg = drbd_asb_recover_2p(peer_device);
3494                         break;
3495                 }
3496                 if (abs(hg) < 100) {
3497                         drbd_warn(device, "Split-Brain detected, %d primaries, "
3498                              "automatically solved. Sync from %s node\n",
3499                              pcount, (hg < 0) ? "peer" : "this");
3500                         if (forced) {
3501                                 drbd_warn(device, "Doing a full sync, since"
3502                                      " UUIDs where ambiguous.\n");
3503                                 hg = hg*2;
3504                         }
3505                 }
3506         }
3507
3508         if (hg == -100) {
3509                 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3510                         hg = -1;
3511                 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3512                         hg = 1;
3513
3514                 if (abs(hg) < 100)
3515                         drbd_warn(device, "Split-Brain detected, manually solved. "
3516                              "Sync from %s node\n",
3517                              (hg < 0) ? "peer" : "this");
3518         }
3519
3520         if (hg == -100) {
3521                 /* FIXME this log message is not correct if we end up here
3522                  * after an attempted attach on a diskless node.
3523                  * We just refuse to attach -- well, we drop the "connection"
3524                  * to that disk, in a way... */
3525                 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3526                 drbd_khelper(device, "split-brain");
3527                 return C_MASK;
3528         }
3529
3530         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3531                 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3532                 return C_MASK;
3533         }
3534
3535         if (hg < 0 && /* by intention we do not use mydisk here. */
3536             device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3537                 switch (rr_conflict) {
3538                 case ASB_CALL_HELPER:
3539                         drbd_khelper(device, "pri-lost");
3540                         /* fall through */
3541                 case ASB_DISCONNECT:
3542                         drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3543                         return C_MASK;
3544                 case ASB_VIOLENTLY:
3545                         drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3546                              "assumption\n");
3547                 }
3548         }
3549
3550         if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3551                 if (hg == 0)
3552                         drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3553                 else
3554                         drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3555                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3556                                  abs(hg) >= 2 ? "full" : "bit-map based");
3557                 return C_MASK;
3558         }
3559
3560         if (abs(hg) >= 2) {
3561                 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3562                 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3563                                         BM_LOCKED_SET_ALLOWED))
3564                         return C_MASK;
3565         }
3566
3567         if (hg > 0) { /* become sync source. */
3568                 rv = C_WF_BITMAP_S;
3569         } else if (hg < 0) { /* become sync target */
3570                 rv = C_WF_BITMAP_T;
3571         } else {
3572                 rv = C_CONNECTED;
3573                 if (drbd_bm_total_weight(device)) {
3574                         drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3575                              drbd_bm_total_weight(device));
3576                 }
3577         }
3578
3579         return rv;
3580 }
3581
3582 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3583 {
3584         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3585         if (peer == ASB_DISCARD_REMOTE)
3586                 return ASB_DISCARD_LOCAL;
3587
3588         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3589         if (peer == ASB_DISCARD_LOCAL)
3590                 return ASB_DISCARD_REMOTE;
3591
3592         /* everything else is valid if they are equal on both sides. */
3593         return peer;
3594 }
3595
3596 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3597 {
3598         struct p_protocol *p = pi->data;
3599         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3600         int p_proto, p_discard_my_data, p_two_primaries, cf;
3601         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3602         char integrity_alg[SHARED_SECRET_MAX] = "";
3603         struct crypto_ahash *peer_integrity_tfm = NULL;
3604         void *int_dig_in = NULL, *int_dig_vv = NULL;
3605
3606         p_proto         = be32_to_cpu(p->protocol);
3607         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3608         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3609         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3610         p_two_primaries = be32_to_cpu(p->two_primaries);
3611         cf              = be32_to_cpu(p->conn_flags);
3612         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3613
3614         if (connection->agreed_pro_version >= 87) {
3615                 int err;
3616
3617                 if (pi->size > sizeof(integrity_alg))
3618                         return -EIO;
3619                 err = drbd_recv_all(connection, integrity_alg, pi->size);
3620                 if (err)
3621                         return err;
3622                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3623         }
3624
3625         if (pi->cmd != P_PROTOCOL_UPDATE) {
3626                 clear_bit(CONN_DRY_RUN, &connection->flags);
3627
3628                 if (cf & CF_DRY_RUN)
3629                         set_bit(CONN_DRY_RUN, &connection->flags);
3630
3631                 rcu_read_lock();
3632                 nc = rcu_dereference(connection->net_conf);
3633
3634                 if (p_proto != nc->wire_protocol) {
3635                         drbd_err(connection, "incompatible %s settings\n", "protocol");
3636                         goto disconnect_rcu_unlock;
3637                 }
3638
3639                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3640                         drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3641                         goto disconnect_rcu_unlock;
3642                 }
3643
3644                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3645                         drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3646                         goto disconnect_rcu_unlock;
3647                 }
3648
3649                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3650                         drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3651                         goto disconnect_rcu_unlock;
3652                 }
3653
3654                 if (p_discard_my_data && nc->discard_my_data) {
3655                         drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3656                         goto disconnect_rcu_unlock;
3657                 }
3658
3659                 if (p_two_primaries != nc->two_primaries) {
3660                         drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3661                         goto disconnect_rcu_unlock;
3662                 }
3663
3664                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3665                         drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3666                         goto disconnect_rcu_unlock;
3667                 }
3668
3669                 rcu_read_unlock();
3670         }
3671
3672         if (integrity_alg[0]) {
3673                 int hash_size;
3674
3675                 /*
3676                  * We can only change the peer data integrity algorithm
3677                  * here.  Changing our own data integrity algorithm
3678                  * requires that we send a P_PROTOCOL_UPDATE packet at
3679                  * the same time; otherwise, the peer has no way to
3680                  * tell between which packets the algorithm should
3681                  * change.
3682                  */
3683
3684                 peer_integrity_tfm = crypto_alloc_ahash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3685                 if (IS_ERR(peer_integrity_tfm)) {
3686                         peer_integrity_tfm = NULL;
3687                         drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3688                                  integrity_alg);
3689                         goto disconnect;
3690                 }
3691
3692                 hash_size = crypto_ahash_digestsize(peer_integrity_tfm);
3693                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3694                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3695                 if (!(int_dig_in && int_dig_vv)) {
3696                         drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3697                         goto disconnect;
3698                 }
3699         }
3700
3701         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3702         if (!new_net_conf) {
3703                 drbd_err(connection, "Allocation of new net_conf failed\n");
3704                 goto disconnect;
3705         }
3706
3707         mutex_lock(&connection->data.mutex);
3708         mutex_lock(&connection->resource->conf_update);
3709         old_net_conf = connection->net_conf;
3710         *new_net_conf = *old_net_conf;
3711
3712         new_net_conf->wire_protocol = p_proto;
3713         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3714         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3715         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3716         new_net_conf->two_primaries = p_two_primaries;
3717
3718         rcu_assign_pointer(connection->net_conf, new_net_conf);
3719         mutex_unlock(&connection->resource->conf_update);
3720         mutex_unlock(&connection->data.mutex);
3721
3722         crypto_free_ahash(connection->peer_integrity_tfm);
3723         kfree(connection->int_dig_in);
3724         kfree(connection->int_dig_vv);
3725         connection->peer_integrity_tfm = peer_integrity_tfm;
3726         connection->int_dig_in = int_dig_in;
3727         connection->int_dig_vv = int_dig_vv;
3728
3729         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3730                 drbd_info(connection, "peer data-integrity-alg: %s\n",
3731                           integrity_alg[0] ? integrity_alg : "(none)");
3732
3733         synchronize_rcu();
3734         kfree(old_net_conf);
3735         return 0;
3736
3737 disconnect_rcu_unlock:
3738         rcu_read_unlock();
3739 disconnect:
3740         crypto_free_ahash(peer_integrity_tfm);
3741         kfree(int_dig_in);
3742         kfree(int_dig_vv);
3743         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3744         return -EIO;
3745 }
3746
3747 /* helper function
3748  * input: alg name, feature name
3749  * return: NULL (alg name was "")
3750  *         ERR_PTR(error) if something goes wrong
3751  *         or the crypto hash ptr, if it worked out ok. */
3752 static struct crypto_ahash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3753                 const char *alg, const char *name)
3754 {
3755         struct crypto_ahash *tfm;
3756
3757         if (!alg[0])
3758                 return NULL;
3759
3760         tfm = crypto_alloc_ahash(alg, 0, CRYPTO_ALG_ASYNC);
3761         if (IS_ERR(tfm)) {
3762                 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3763                         alg, name, PTR_ERR(tfm));
3764                 return tfm;
3765         }
3766         return tfm;
3767 }
3768
3769 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3770 {
3771         void *buffer = connection->data.rbuf;
3772         int size = pi->size;
3773
3774         while (size) {
3775                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3776                 s = drbd_recv(connection, buffer, s);
3777                 if (s <= 0) {
3778                         if (s < 0)
3779                                 return s;
3780                         break;
3781                 }
3782                 size -= s;
3783         }
3784         if (size)
3785                 return -EIO;
3786         return 0;
3787 }
3788
3789 /*
3790  * config_unknown_volume  -  device configuration command for unknown volume
3791  *
3792  * When a device is added to an existing connection, the node on which the
3793  * device is added first will send configuration commands to its peer but the
3794  * peer will not know about the device yet.  It will warn and ignore these
3795  * commands.  Once the device is added on the second node, the second node will
3796  * send the same device configuration commands, but in the other direction.
3797  *
3798  * (We can also end up here if drbd is misconfigured.)
3799  */
3800 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3801 {
3802         drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3803                   cmdname(pi->cmd), pi->vnr);
3804         return ignore_remaining_packet(connection, pi);
3805 }
3806
3807 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3808 {
3809         struct drbd_peer_device *peer_device;
3810         struct drbd_device *device;
3811         struct p_rs_param_95 *p;
3812         unsigned int header_size, data_size, exp_max_sz;
3813         struct crypto_ahash *verify_tfm = NULL;
3814         struct crypto_ahash *csums_tfm = NULL;
3815         struct net_conf *old_net_conf, *new_net_conf = NULL;
3816         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3817         const int apv = connection->agreed_pro_version;
3818         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3819         int fifo_size = 0;
3820         int err;
3821
3822         peer_device = conn_peer_device(connection, pi->vnr);
3823         if (!peer_device)
3824                 return config_unknown_volume(connection, pi);
3825         device = peer_device->device;
3826
3827         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3828                     : apv == 88 ? sizeof(struct p_rs_param)
3829                                         + SHARED_SECRET_MAX
3830                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3831                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3832
3833         if (pi->size > exp_max_sz) {
3834                 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3835                     pi->size, exp_max_sz);
3836                 return -EIO;
3837         }
3838
3839         if (apv <= 88) {
3840                 header_size = sizeof(struct p_rs_param);
3841                 data_size = pi->size - header_size;
3842         } else if (apv <= 94) {
3843                 header_size = sizeof(struct p_rs_param_89);
3844                 data_size = pi->size - header_size;
3845                 D_ASSERT(device, data_size == 0);
3846         } else {
3847                 header_size = sizeof(struct p_rs_param_95);
3848                 data_size = pi->size - header_size;
3849                 D_ASSERT(device, data_size == 0);
3850         }
3851
3852         /* initialize verify_alg and csums_alg */
3853         p = pi->data;
3854         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3855
3856         err = drbd_recv_all(peer_device->connection, p, header_size);
3857         if (err)
3858                 return err;
3859
3860         mutex_lock(&connection->resource->conf_update);
3861         old_net_conf = peer_device->connection->net_conf;
3862         if (get_ldev(device)) {
3863                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3864                 if (!new_disk_conf) {
3865                         put_ldev(device);
3866                         mutex_unlock(&connection->resource->conf_update);
3867                         drbd_err(device, "Allocation of new disk_conf failed\n");
3868                         return -ENOMEM;
3869                 }
3870
3871                 old_disk_conf = device->ldev->disk_conf;
3872                 *new_disk_conf = *old_disk_conf;
3873
3874                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3875         }
3876
3877         if (apv >= 88) {
3878                 if (apv == 88) {
3879                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3880                                 drbd_err(device, "verify-alg of wrong size, "
3881                                         "peer wants %u, accepting only up to %u byte\n",
3882                                         data_size, SHARED_SECRET_MAX);
3883                                 err = -EIO;
3884                                 goto reconnect;
3885                         }
3886
3887                         err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3888                         if (err)
3889                                 goto reconnect;
3890                         /* we expect NUL terminated string */
3891                         /* but just in case someone tries to be evil */
3892                         D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3893                         p->verify_alg[data_size-1] = 0;
3894
3895                 } else /* apv >= 89 */ {
3896                         /* we still expect NUL terminated strings */
3897                         /* but just in case someone tries to be evil */
3898                         D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3899                         D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3900                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3901                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3902                 }
3903
3904                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3905                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3906                                 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3907                                     old_net_conf->verify_alg, p->verify_alg);
3908                                 goto disconnect;
3909                         }
3910                         verify_tfm = drbd_crypto_alloc_digest_safe(device,
3911                                         p->verify_alg, "verify-alg");
3912                         if (IS_ERR(verify_tfm)) {
3913                                 verify_tfm = NULL;
3914                                 goto disconnect;
3915                         }
3916                 }
3917
3918                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3919                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3920                                 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3921                                     old_net_conf->csums_alg, p->csums_alg);
3922                                 goto disconnect;
3923                         }
3924                         csums_tfm = drbd_crypto_alloc_digest_safe(device,
3925                                         p->csums_alg, "csums-alg");
3926                         if (IS_ERR(csums_tfm)) {
3927                                 csums_tfm = NULL;
3928                                 goto disconnect;
3929                         }
3930                 }
3931
3932                 if (apv > 94 && new_disk_conf) {
3933                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3934                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3935                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3936                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3937
3938                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3939                         if (fifo_size != device->rs_plan_s->size) {
3940                                 new_plan = fifo_alloc(fifo_size);
3941                                 if (!new_plan) {
3942                                         drbd_err(device, "kmalloc of fifo_buffer failed");
3943                                         put_ldev(device);
3944                                         goto disconnect;
3945                                 }
3946                         }
3947                 }
3948
3949                 if (verify_tfm || csums_tfm) {
3950                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3951                         if (!new_net_conf) {
3952                                 drbd_err(device, "Allocation of new net_conf failed\n");
3953                                 goto disconnect;
3954                         }
3955
3956                         *new_net_conf = *old_net_conf;
3957
3958                         if (verify_tfm) {
3959                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3960                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3961                                 crypto_free_ahash(peer_device->connection->verify_tfm);
3962                                 peer_device->connection->verify_tfm = verify_tfm;
3963                                 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3964                         }
3965                         if (csums_tfm) {
3966                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3967                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3968                                 crypto_free_ahash(peer_device->connection->csums_tfm);
3969                                 peer_device->connection->csums_tfm = csums_tfm;
3970                                 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3971                         }
3972                         rcu_assign_pointer(connection->net_conf, new_net_conf);
3973                 }
3974         }
3975
3976         if (new_disk_conf) {
3977                 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3978                 put_ldev(device);
3979         }
3980
3981         if (new_plan) {
3982                 old_plan = device->rs_plan_s;
3983                 rcu_assign_pointer(device->rs_plan_s, new_plan);
3984         }
3985
3986         mutex_unlock(&connection->resource->conf_update);
3987         synchronize_rcu();
3988         if (new_net_conf)
3989                 kfree(old_net_conf);
3990         kfree(old_disk_conf);
3991         kfree(old_plan);
3992
3993         return 0;
3994
3995 reconnect:
3996         if (new_disk_conf) {
3997                 put_ldev(device);
3998                 kfree(new_disk_conf);
3999         }
4000         mutex_unlock(&connection->resource->conf_update);
4001         return -EIO;
4002
4003 disconnect:
4004         kfree(new_plan);
4005         if (new_disk_conf) {
4006                 put_ldev(device);
4007                 kfree(new_disk_conf);
4008         }
4009         mutex_unlock(&connection->resource->conf_update);
4010         /* just for completeness: actually not needed,
4011          * as this is not reached if csums_tfm was ok. */
4012         crypto_free_ahash(csums_tfm);
4013         /* but free the verify_tfm again, if csums_tfm did not work out */
4014         crypto_free_ahash(verify_tfm);
4015         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4016         return -EIO;
4017 }
4018
4019 /* warn if the arguments differ by more than 12.5% */
4020 static void warn_if_differ_considerably(struct drbd_device *device,
4021         const char *s, sector_t a, sector_t b)
4022 {
4023         sector_t d;
4024         if (a == 0 || b == 0)
4025                 return;
4026         d = (a > b) ? (a - b) : (b - a);
4027         if (d > (a>>3) || d > (b>>3))
4028                 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
4029                      (unsigned long long)a, (unsigned long long)b);
4030 }
4031
4032 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
4033 {
4034         struct drbd_peer_device *peer_device;
4035         struct drbd_device *device;
4036         struct p_sizes *p = pi->data;
4037         struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
4038         enum determine_dev_size dd = DS_UNCHANGED;
4039         sector_t p_size, p_usize, p_csize, my_usize;
4040         sector_t new_size, cur_size;
4041         int ldsc = 0; /* local disk size changed */
4042         enum dds_flags ddsf;
4043
4044         peer_device = conn_peer_device(connection, pi->vnr);
4045         if (!peer_device)
4046                 return config_unknown_volume(connection, pi);
4047         device = peer_device->device;
4048         cur_size = drbd_get_capacity(device->this_bdev);
4049
4050         p_size = be64_to_cpu(p->d_size);
4051         p_usize = be64_to_cpu(p->u_size);
4052         p_csize = be64_to_cpu(p->c_size);
4053
4054         /* just store the peer's disk size for now.
4055          * we still need to figure out whether we accept that. */
4056         device->p_size = p_size;
4057
4058         if (get_ldev(device)) {
4059                 rcu_read_lock();
4060                 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
4061                 rcu_read_unlock();
4062
4063                 warn_if_differ_considerably(device, "lower level device sizes",
4064                            p_size, drbd_get_max_capacity(device->ldev));
4065                 warn_if_differ_considerably(device, "user requested size",
4066                                             p_usize, my_usize);
4067
4068                 /* if this is the first connect, or an otherwise expected
4069                  * param exchange, choose the minimum */
4070                 if (device->state.conn == C_WF_REPORT_PARAMS)
4071                         p_usize = min_not_zero(my_usize, p_usize);
4072
4073                 /* Never shrink a device with usable data during connect.
4074                    But allow online shrinking if we are connected. */
4075                 new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
4076                 if (new_size < cur_size &&
4077                     device->state.disk >= D_OUTDATED &&
4078                     device->state.conn < C_CONNECTED) {
4079                         drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
4080                                         (unsigned long long)new_size, (unsigned long long)cur_size);
4081                         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4082                         put_ldev(device);
4083                         return -EIO;
4084                 }
4085
4086                 if (my_usize != p_usize) {
4087                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
4088
4089                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
4090                         if (!new_disk_conf) {
4091                                 drbd_err(device, "Allocation of new disk_conf failed\n");
4092                                 put_ldev(device);
4093                                 return -ENOMEM;
4094                         }
4095
4096                         mutex_lock(&connection->resource->conf_update);
4097                         old_disk_conf = device->ldev->disk_conf;
4098                         *new_disk_conf = *old_disk_conf;
4099                         new_disk_conf->disk_size = p_usize;
4100
4101                         rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4102                         mutex_unlock(&connection->resource->conf_update);
4103                         synchronize_rcu();
4104                         kfree(old_disk_conf);
4105
4106                         drbd_info(device, "Peer sets u_size to %lu sectors\n",
4107                                  (unsigned long)my_usize);
4108                 }
4109
4110                 put_ldev(device);
4111         }
4112
4113         device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4114         /* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4115            In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4116            drbd_reconsider_queue_parameters(), we can be sure that after
4117            drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4118
4119         ddsf = be16_to_cpu(p->dds_flags);
4120         if (get_ldev(device)) {
4121                 drbd_reconsider_queue_parameters(device, device->ldev, o);
4122                 dd = drbd_determine_dev_size(device, ddsf, NULL);
4123                 put_ldev(device);
4124                 if (dd == DS_ERROR)
4125                         return -EIO;
4126                 drbd_md_sync(device);
4127         } else {
4128                 /*
4129                  * I am diskless, need to accept the peer's *current* size.
4130                  * I must NOT accept the peers backing disk size,
4131                  * it may have been larger than mine all along...
4132                  *
4133                  * At this point, the peer knows more about my disk, or at
4134                  * least about what we last agreed upon, than myself.
4135                  * So if his c_size is less than his d_size, the most likely
4136                  * reason is that *my* d_size was smaller last time we checked.
4137                  *
4138                  * However, if he sends a zero current size,
4139                  * take his (user-capped or) backing disk size anyways.
4140                  *
4141                  * Unless of course he does not have a disk himself.
4142                  * In which case we ignore this completely.
4143                  */
4144                 sector_t new_size = p_csize ?: p_usize ?: p_size;
4145                 drbd_reconsider_queue_parameters(device, NULL, o);
4146                 if (new_size == 0) {
4147                         /* Ignore, peer does not know nothing. */
4148                 } else if (new_size == cur_size) {
4149                         /* nothing to do */
4150                 } else if (cur_size != 0 && p_size == 0) {
4151                         drbd_warn(device, "Ignored diskless peer device size (peer:%llu != me:%llu sectors)!\n",
4152                                         (unsigned long long)new_size, (unsigned long long)cur_size);
4153                 } else if (new_size < cur_size && device->state.role == R_PRIMARY) {
4154                         drbd_err(device, "The peer's device size is too small! (%llu < %llu sectors); demote me first!\n",
4155                                         (unsigned long long)new_size, (unsigned long long)cur_size);
4156                         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4157                         return -EIO;
4158                 } else {
4159                         /* I believe the peer, if
4160                          *  - I don't have a current size myself
4161                          *  - we agree on the size anyways
4162                          *  - I do have a current size, am Secondary,
4163                          *    and he has the only disk
4164                          *  - I do have a current size, am Primary,
4165                          *    and he has the only disk,
4166                          *    which is larger than my current size
4167                          */
4168                         drbd_set_my_capacity(device, new_size);
4169                 }
4170         }
4171
4172         if (get_ldev(device)) {
4173                 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4174                         device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4175                         ldsc = 1;
4176                 }
4177
4178                 put_ldev(device);
4179         }
4180
4181         if (device->state.conn > C_WF_REPORT_PARAMS) {
4182                 if (be64_to_cpu(p->c_size) !=
4183                     drbd_get_capacity(device->this_bdev) || ldsc) {
4184                         /* we have different sizes, probably peer
4185                          * needs to know my new size... */
4186                         drbd_send_sizes(peer_device, 0, ddsf);
4187                 }
4188                 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4189                     (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4190                         if (device->state.pdsk >= D_INCONSISTENT &&
4191                             device->state.disk >= D_INCONSISTENT) {
4192                                 if (ddsf & DDSF_NO_RESYNC)
4193                                         drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4194                                 else
4195                                         resync_after_online_grow(device);
4196                         } else
4197                                 set_bit(RESYNC_AFTER_NEG, &device->flags);
4198                 }
4199         }
4200
4201         return 0;
4202 }
4203
4204 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4205 {
4206         struct drbd_peer_device *peer_device;
4207         struct drbd_device *device;
4208         struct p_uuids *p = pi->data;
4209         u64 *p_uuid;
4210         int i, updated_uuids = 0;
4211
4212         peer_device = conn_peer_device(connection, pi->vnr);
4213         if (!peer_device)
4214                 return config_unknown_volume(connection, pi);
4215         device = peer_device->device;
4216
4217         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
4218         if (!p_uuid) {
4219                 drbd_err(device, "kmalloc of p_uuid failed\n");
4220                 return false;
4221         }
4222
4223         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4224                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
4225
4226         kfree(device->p_uuid);
4227         device->p_uuid = p_uuid;
4228
4229         if ((device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS) &&
4230             device->state.disk < D_INCONSISTENT &&
4231             device->state.role == R_PRIMARY &&
4232             (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4233                 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4234                     (unsigned long long)device->ed_uuid);
4235                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4236                 return -EIO;
4237         }
4238
4239         if (get_ldev(device)) {
4240                 int skip_initial_sync =
4241                         device->state.conn == C_CONNECTED &&
4242                         peer_device->connection->agreed_pro_version >= 90 &&
4243                         device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4244                         (p_uuid[UI_FLAGS] & 8);
4245                 if (skip_initial_sync) {
4246                         drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4247                         drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4248                                         "clear_n_write from receive_uuids",
4249                                         BM_LOCKED_TEST_ALLOWED);
4250                         _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4251                         _drbd_uuid_set(device, UI_BITMAP, 0);
4252                         _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4253                                         CS_VERBOSE, NULL);
4254                         drbd_md_sync(device);
4255                         updated_uuids = 1;
4256                 }
4257                 put_ldev(device);
4258         } else if (device->state.disk < D_INCONSISTENT &&
4259                    device->state.role == R_PRIMARY) {
4260                 /* I am a diskless primary, the peer just created a new current UUID
4261                    for me. */
4262                 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4263         }
4264
4265         /* Before we test for the disk state, we should wait until an eventually
4266            ongoing cluster wide state change is finished. That is important if
4267            we are primary and are detaching from our disk. We need to see the
4268            new disk state... */
4269         mutex_lock(device->state_mutex);
4270         mutex_unlock(device->state_mutex);
4271         if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4272                 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4273
4274         if (updated_uuids)
4275                 drbd_print_uuids(device, "receiver updated UUIDs to");
4276
4277         return 0;
4278 }
4279
4280 /**
4281  * convert_state() - Converts the peer's view of the cluster state to our point of view
4282  * @ps:         The state as seen by the peer.
4283  */
4284 static union drbd_state convert_state(union drbd_state ps)
4285 {
4286         union drbd_state ms;
4287
4288         static enum drbd_conns c_tab[] = {
4289                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4290                 [C_CONNECTED] = C_CONNECTED,
4291
4292                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4293                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4294                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4295                 [C_VERIFY_S]       = C_VERIFY_T,
4296                 [C_MASK]   = C_MASK,
4297         };
4298
4299         ms.i = ps.i;
4300
4301         ms.conn = c_tab[ps.conn];
4302         ms.peer = ps.role;
4303         ms.role = ps.peer;
4304         ms.pdsk = ps.disk;
4305         ms.disk = ps.pdsk;
4306         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4307
4308         return ms;
4309 }
4310
4311 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4312 {
4313         struct drbd_peer_device *peer_device;
4314         struct drbd_device *device;
4315         struct p_req_state *p = pi->data;
4316         union drbd_state mask, val;
4317         enum drbd_state_rv rv;
4318
4319         peer_device = conn_peer_device(connection, pi->vnr);
4320         if (!peer_device)
4321                 return -EIO;
4322         device = peer_device->device;
4323
4324         mask.i = be32_to_cpu(p->mask);
4325         val.i = be32_to_cpu(p->val);
4326
4327         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4328             mutex_is_locked(device->state_mutex)) {
4329                 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4330                 return 0;
4331         }
4332
4333         mask = convert_state(mask);
4334         val = convert_state(val);
4335
4336         rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4337         drbd_send_sr_reply(peer_device, rv);
4338
4339         drbd_md_sync(device);
4340
4341         return 0;
4342 }
4343
4344 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4345 {
4346         struct p_req_state *p = pi->data;
4347         union drbd_state mask, val;
4348         enum drbd_state_rv rv;
4349
4350         mask.i = be32_to_cpu(p->mask);
4351         val.i = be32_to_cpu(p->val);
4352
4353         if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4354             mutex_is_locked(&connection->cstate_mutex)) {
4355                 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4356                 return 0;
4357         }
4358
4359         mask = convert_state(mask);
4360         val = convert_state(val);
4361
4362         rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4363         conn_send_sr_reply(connection, rv);
4364
4365         return 0;
4366 }
4367
4368 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4369 {
4370         struct drbd_peer_device *peer_device;
4371         struct drbd_device *device;
4372         struct p_state *p = pi->data;
4373         union drbd_state os, ns, peer_state;
4374         enum drbd_disk_state real_peer_disk;
4375         enum chg_state_flags cs_flags;
4376         int rv;
4377
4378         peer_device = conn_peer_device(connection, pi->vnr);
4379         if (!peer_device)
4380                 return config_unknown_volume(connection, pi);
4381         device = peer_device->device;
4382
4383         peer_state.i = be32_to_cpu(p->state);
4384
4385         real_peer_disk = peer_state.disk;
4386         if (peer_state.disk == D_NEGOTIATING) {
4387                 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4388                 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4389         }
4390
4391         spin_lock_irq(&device->resource->req_lock);
4392  retry:
4393         os = ns = drbd_read_state(device);
4394         spin_unlock_irq(&device->resource->req_lock);
4395
4396         /* If some other part of the code (ack_receiver thread, timeout)
4397          * already decided to close the connection again,
4398          * we must not "re-establish" it here. */
4399         if (os.conn <= C_TEAR_DOWN)
4400                 return -ECONNRESET;
4401
4402         /* If this is the "end of sync" confirmation, usually the peer disk
4403          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4404          * set) resync started in PausedSyncT, or if the timing of pause-/
4405          * unpause-sync events has been "just right", the peer disk may
4406          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4407          */
4408         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4409             real_peer_disk == D_UP_TO_DATE &&
4410             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4411                 /* If we are (becoming) SyncSource, but peer is still in sync
4412                  * preparation, ignore its uptodate-ness to avoid flapping, it
4413                  * will change to inconsistent once the peer reaches active
4414                  * syncing states.
4415                  * It may have changed syncer-paused flags, however, so we
4416                  * cannot ignore this completely. */
4417                 if (peer_state.conn > C_CONNECTED &&
4418                     peer_state.conn < C_SYNC_SOURCE)
4419                         real_peer_disk = D_INCONSISTENT;
4420
4421                 /* if peer_state changes to connected at the same time,
4422                  * it explicitly notifies us that it finished resync.
4423                  * Maybe we should finish it up, too? */
4424                 else if (os.conn >= C_SYNC_SOURCE &&
4425                          peer_state.conn == C_CONNECTED) {
4426                         if (drbd_bm_total_weight(device) <= device->rs_failed)
4427                                 drbd_resync_finished(device);
4428                         return 0;
4429                 }
4430         }
4431
4432         /* explicit verify finished notification, stop sector reached. */
4433         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4434             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4435                 ov_out_of_sync_print(device);
4436                 drbd_resync_finished(device);
4437                 return 0;
4438         }
4439
4440         /* peer says his disk is inconsistent, while we think it is uptodate,
4441          * and this happens while the peer still thinks we have a sync going on,
4442          * but we think we are already done with the sync.
4443          * We ignore this to avoid flapping pdsk.
4444          * This should not happen, if the peer is a recent version of drbd. */
4445         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4446             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4447                 real_peer_disk = D_UP_TO_DATE;
4448
4449         if (ns.conn == C_WF_REPORT_PARAMS)
4450                 ns.conn = C_CONNECTED;
4451
4452         if (peer_state.conn == C_AHEAD)
4453                 ns.conn = C_BEHIND;
4454
4455         /* TODO:
4456          * if (primary and diskless and peer uuid != effective uuid)
4457          *     abort attach on peer;
4458          *
4459          * If this node does not have good data, was already connected, but
4460          * the peer did a late attach only now, trying to "negotiate" with me,
4461          * AND I am currently Primary, possibly frozen, with some specific
4462          * "effective" uuid, this should never be reached, really, because
4463          * we first send the uuids, then the current state.
4464          *
4465          * In this scenario, we already dropped the connection hard
4466          * when we received the unsuitable uuids (receive_uuids().
4467          *
4468          * Should we want to change this, that is: not drop the connection in
4469          * receive_uuids() already, then we would need to add a branch here
4470          * that aborts the attach of "unsuitable uuids" on the peer in case
4471          * this node is currently Diskless Primary.
4472          */
4473
4474         if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4475             get_ldev_if_state(device, D_NEGOTIATING)) {
4476                 int cr; /* consider resync */
4477
4478                 /* if we established a new connection */
4479                 cr  = (os.conn < C_CONNECTED);
4480                 /* if we had an established connection
4481                  * and one of the nodes newly attaches a disk */
4482                 cr |= (os.conn == C_CONNECTED &&
4483                        (peer_state.disk == D_NEGOTIATING ||
4484                         os.disk == D_NEGOTIATING));
4485                 /* if we have both been inconsistent, and the peer has been
4486                  * forced to be UpToDate with --overwrite-data */
4487                 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4488                 /* if we had been plain connected, and the admin requested to
4489                  * start a sync by "invalidate" or "invalidate-remote" */
4490                 cr |= (os.conn == C_CONNECTED &&
4491                                 (peer_state.conn >= C_STARTING_SYNC_S &&
4492                                  peer_state.conn <= C_WF_BITMAP_T));
4493
4494                 if (cr)
4495                         ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4496
4497                 put_ldev(device);
4498                 if (ns.conn == C_MASK) {
4499                         ns.conn = C_CONNECTED;
4500                         if (device->state.disk == D_NEGOTIATING) {
4501                                 drbd_force_state(device, NS(disk, D_FAILED));
4502                         } else if (peer_state.disk == D_NEGOTIATING) {
4503                                 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4504                                 peer_state.disk = D_DISKLESS;
4505                                 real_peer_disk = D_DISKLESS;
4506                         } else {
4507                                 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4508                                         return -EIO;
4509                                 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4510                                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4511                                 return -EIO;
4512                         }
4513                 }
4514         }
4515
4516         spin_lock_irq(&device->resource->req_lock);
4517         if (os.i != drbd_read_state(device).i)
4518                 goto retry;
4519         clear_bit(CONSIDER_RESYNC, &device->flags);
4520         ns.peer = peer_state.role;
4521         ns.pdsk = real_peer_disk;
4522         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4523         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4524                 ns.disk = device->new_state_tmp.disk;
4525         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4526         if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4527             test_bit(NEW_CUR_UUID, &device->flags)) {
4528                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4529                    for temporal network outages! */
4530                 spin_unlock_irq(&device->resource->req_lock);
4531                 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4532                 tl_clear(peer_device->connection);
4533                 drbd_uuid_new_current(device);
4534                 clear_bit(NEW_CUR_UUID, &device->flags);
4535                 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4536                 return -EIO;
4537         }
4538         rv = _drbd_set_state(device, ns, cs_flags, NULL);
4539         ns = drbd_read_state(device);
4540         spin_unlock_irq(&device->resource->req_lock);
4541
4542         if (rv < SS_SUCCESS) {
4543                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4544                 return -EIO;
4545         }
4546
4547         if (os.conn > C_WF_REPORT_PARAMS) {
4548                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4549                     peer_state.disk != D_NEGOTIATING ) {
4550                         /* we want resync, peer has not yet decided to sync... */
4551                         /* Nowadays only used when forcing a node into primary role and
4552                            setting its disk to UpToDate with that */
4553                         drbd_send_uuids(peer_device);
4554                         drbd_send_current_state(peer_device);
4555                 }
4556         }
4557
4558         clear_bit(DISCARD_MY_DATA, &device->flags);
4559
4560         drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4561
4562         return 0;
4563 }
4564
4565 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4566 {
4567         struct drbd_peer_device *peer_device;
4568         struct drbd_device *device;
4569         struct p_rs_uuid *p = pi->data;
4570
4571         peer_device = conn_peer_device(connection, pi->vnr);
4572         if (!peer_device)
4573                 return -EIO;
4574         device = peer_device->device;
4575
4576         wait_event(device->misc_wait,
4577                    device->state.conn == C_WF_SYNC_UUID ||
4578                    device->state.conn == C_BEHIND ||
4579                    device->state.conn < C_CONNECTED ||
4580                    device->state.disk < D_NEGOTIATING);
4581
4582         /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4583
4584         /* Here the _drbd_uuid_ functions are right, current should
4585            _not_ be rotated into the history */
4586         if (get_ldev_if_state(device, D_NEGOTIATING)) {
4587                 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4588                 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4589
4590                 drbd_print_uuids(device, "updated sync uuid");
4591                 drbd_start_resync(device, C_SYNC_TARGET);
4592
4593                 put_ldev(device);
4594         } else
4595                 drbd_err(device, "Ignoring SyncUUID packet!\n");
4596
4597         return 0;
4598 }
4599
4600 /**
4601  * receive_bitmap_plain
4602  *
4603  * Return 0 when done, 1 when another iteration is needed, and a negative error
4604  * code upon failure.
4605  */
4606 static int
4607 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4608                      unsigned long *p, struct bm_xfer_ctx *c)
4609 {
4610         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4611                                  drbd_header_size(peer_device->connection);
4612         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4613                                        c->bm_words - c->word_offset);
4614         unsigned int want = num_words * sizeof(*p);
4615         int err;
4616
4617         if (want != size) {
4618                 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4619                 return -EIO;
4620         }
4621         if (want == 0)
4622                 return 0;
4623         err = drbd_recv_all(peer_device->connection, p, want);
4624         if (err)
4625                 return err;
4626
4627         drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4628
4629         c->word_offset += num_words;
4630         c->bit_offset = c->word_offset * BITS_PER_LONG;
4631         if (c->bit_offset > c->bm_bits)
4632                 c->bit_offset = c->bm_bits;
4633
4634         return 1;
4635 }
4636
4637 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4638 {
4639         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4640 }
4641
4642 static int dcbp_get_start(struct p_compressed_bm *p)
4643 {
4644         return (p->encoding & 0x80) != 0;
4645 }
4646
4647 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4648 {
4649         return (p->encoding >> 4) & 0x7;
4650 }
4651
4652 /**
4653  * recv_bm_rle_bits
4654  *
4655  * Return 0 when done, 1 when another iteration is needed, and a negative error
4656  * code upon failure.
4657  */
4658 static int
4659 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4660                 struct p_compressed_bm *p,
4661                  struct bm_xfer_ctx *c,
4662                  unsigned int len)
4663 {
4664         struct bitstream bs;
4665         u64 look_ahead;
4666         u64 rl;
4667         u64 tmp;
4668         unsigned long s = c->bit_offset;
4669         unsigned long e;
4670         int toggle = dcbp_get_start(p);
4671         int have;
4672         int bits;
4673
4674         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4675
4676         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4677         if (bits < 0)
4678                 return -EIO;
4679
4680         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4681                 bits = vli_decode_bits(&rl, look_ahead);
4682                 if (bits <= 0)
4683                         return -EIO;
4684
4685                 if (toggle) {
4686                         e = s + rl -1;
4687                         if (e >= c->bm_bits) {
4688                                 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4689                                 return -EIO;
4690                         }
4691                         _drbd_bm_set_bits(peer_device->device, s, e);
4692                 }
4693
4694                 if (have < bits) {
4695                         drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4696                                 have, bits, look_ahead,
4697                                 (unsigned int)(bs.cur.b - p->code),
4698                                 (unsigned int)bs.buf_len);
4699                         return -EIO;
4700                 }
4701                 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4702                 if (likely(bits < 64))
4703                         look_ahead >>= bits;
4704                 else
4705                         look_ahead = 0;
4706                 have -= bits;
4707
4708                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4709                 if (bits < 0)
4710                         return -EIO;
4711                 look_ahead |= tmp << have;
4712                 have += bits;
4713         }
4714
4715         c->bit_offset = s;
4716         bm_xfer_ctx_bit_to_word_offset(c);
4717
4718         return (s != c->bm_bits);
4719 }
4720
4721 /**
4722  * decode_bitmap_c
4723  *
4724  * Return 0 when done, 1 when another iteration is needed, and a negative error
4725  * code upon failure.
4726  */
4727 static int
4728 decode_bitmap_c(struct drbd_peer_device *peer_device,
4729                 struct p_compressed_bm *p,
4730                 struct bm_xfer_ctx *c,
4731                 unsigned int len)
4732 {
4733         if (dcbp_get_code(p) == RLE_VLI_Bits)
4734                 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4735
4736         /* other variants had been implemented for evaluation,
4737          * but have been dropped as this one turned out to be "best"
4738          * during all our tests. */
4739
4740         drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4741         conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4742         return -EIO;
4743 }
4744
4745 void INFO_bm_xfer_stats(struct drbd_device *device,
4746                 const char *direction, struct bm_xfer_ctx *c)
4747 {
4748         /* what would it take to transfer it "plaintext" */
4749         unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4750         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4751         unsigned int plain =
4752                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4753                 c->bm_words * sizeof(unsigned long);
4754         unsigned int total = c->bytes[0] + c->bytes[1];
4755         unsigned int r;
4756
4757         /* total can not be zero. but just in case: */
4758         if (total == 0)
4759                 return;
4760
4761         /* don't report if not compressed */
4762         if (total >= plain)
4763                 return;
4764
4765         /* total < plain. check for overflow, still */
4766         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4767                                     : (1000 * total / plain);
4768
4769         if (r > 1000)
4770                 r = 1000;
4771
4772         r = 1000 - r;
4773         drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4774              "total %u; compression: %u.%u%%\n",
4775                         direction,
4776                         c->bytes[1], c->packets[1],
4777                         c->bytes[0], c->packets[0],
4778                         total, r/10, r % 10);
4779 }
4780
4781 /* Since we are processing the bitfield from lower addresses to higher,
4782    it does not matter if the process it in 32 bit chunks or 64 bit
4783    chunks as long as it is little endian. (Understand it as byte stream,
4784    beginning with the lowest byte...) If we would use big endian
4785    we would need to process it from the highest address to the lowest,
4786    in order to be agnostic to the 32 vs 64 bits issue.
4787
4788    returns 0 on failure, 1 if we successfully received it. */
4789 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4790 {
4791         struct drbd_peer_device *peer_device;
4792         struct drbd_device *device;
4793         struct bm_xfer_ctx c;
4794         int err;
4795
4796         peer_device = conn_peer_device(connection, pi->vnr);
4797         if (!peer_device)
4798                 return -EIO;
4799         device = peer_device->device;
4800
4801         drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4802         /* you are supposed to send additional out-of-sync information
4803          * if you actually set bits during this phase */
4804
4805         c = (struct bm_xfer_ctx) {
4806                 .bm_bits = drbd_bm_bits(device),
4807                 .bm_words = drbd_bm_words(device),
4808         };
4809
4810         for(;;) {
4811                 if (pi->cmd == P_BITMAP)
4812                         err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4813                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4814                         /* MAYBE: sanity check that we speak proto >= 90,
4815                          * and the feature is enabled! */
4816                         struct p_compressed_bm *p = pi->data;
4817
4818                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4819                                 drbd_err(device, "ReportCBitmap packet too large\n");
4820                                 err = -EIO;
4821                                 goto out;
4822                         }
4823                         if (pi->size <= sizeof(*p)) {
4824                                 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4825                                 err = -EIO;
4826                                 goto out;
4827                         }
4828                         err = drbd_recv_all(peer_device->connection, p, pi->size);
4829                         if (err)
4830                                goto out;
4831                         err = decode_bitmap_c(peer_device, p, &c, pi->size);
4832                 } else {
4833                         drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4834                         err = -EIO;
4835                         goto out;
4836                 }
4837
4838                 c.packets[pi->cmd == P_BITMAP]++;
4839                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4840
4841                 if (err <= 0) {
4842                         if (err < 0)
4843                                 goto out;
4844                         break;
4845                 }
4846                 err = drbd_recv_header(peer_device->connection, pi);
4847                 if (err)
4848                         goto out;
4849         }
4850
4851         INFO_bm_xfer_stats(device, "receive", &c);
4852
4853         if (device->state.conn == C_WF_BITMAP_T) {
4854                 enum drbd_state_rv rv;
4855
4856                 err = drbd_send_bitmap(device);
4857                 if (err)
4858                         goto out;
4859                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4860                 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4861                 D_ASSERT(device, rv == SS_SUCCESS);
4862         } else if (device->state.conn != C_WF_BITMAP_S) {
4863                 /* admin may have requested C_DISCONNECTING,
4864                  * other threads may have noticed network errors */
4865                 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4866                     drbd_conn_str(device->state.conn));
4867         }
4868         err = 0;
4869
4870  out:
4871         drbd_bm_unlock(device);
4872         if (!err && device->state.conn == C_WF_BITMAP_S)
4873                 drbd_start_resync(device, C_SYNC_SOURCE);
4874         return err;
4875 }
4876
4877 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4878 {
4879         drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4880                  pi->cmd, pi->size);
4881
4882         return ignore_remaining_packet(connection, pi);
4883 }
4884
4885 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4886 {
4887         /* Make sure we've acked all the TCP data associated
4888          * with the data requests being unplugged */
4889         drbd_tcp_quickack(connection->data.socket);
4890
4891         return 0;
4892 }
4893
4894 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4895 {
4896         struct drbd_peer_device *peer_device;
4897         struct drbd_device *device;
4898         struct p_block_desc *p = pi->data;
4899
4900         peer_device = conn_peer_device(connection, pi->vnr);
4901         if (!peer_device)
4902                 return -EIO;
4903         device = peer_device->device;
4904
4905         switch (device->state.conn) {
4906         case C_WF_SYNC_UUID:
4907         case C_WF_BITMAP_T:
4908         case C_BEHIND:
4909                         break;
4910         default:
4911                 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4912                                 drbd_conn_str(device->state.conn));
4913         }
4914
4915         drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4916
4917         return 0;
4918 }
4919
4920 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4921 {
4922         struct drbd_peer_device *peer_device;
4923         struct p_block_desc *p = pi->data;
4924         struct drbd_device *device;
4925         sector_t sector;
4926         int size, err = 0;
4927
4928         peer_device = conn_peer_device(connection, pi->vnr);
4929         if (!peer_device)
4930                 return -EIO;
4931         device = peer_device->device;
4932
4933         sector = be64_to_cpu(p->sector);
4934         size = be32_to_cpu(p->blksize);
4935
4936         dec_rs_pending(device);
4937
4938         if (get_ldev(device)) {
4939                 struct drbd_peer_request *peer_req;
4940                 const int op = REQ_OP_DISCARD;
4941
4942                 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
4943                                                size, 0, GFP_NOIO);
4944                 if (!peer_req) {
4945                         put_ldev(device);
4946                         return -ENOMEM;
4947                 }
4948
4949                 peer_req->w.cb = e_end_resync_block;
4950                 peer_req->submit_jif = jiffies;
4951                 peer_req->flags |= EE_IS_TRIM;
4952
4953                 spin_lock_irq(&device->resource->req_lock);
4954                 list_add_tail(&peer_req->w.list, &device->sync_ee);
4955                 spin_unlock_irq(&device->resource->req_lock);
4956
4957                 atomic_add(pi->size >> 9, &device->rs_sect_ev);
4958                 err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
4959
4960                 if (err) {
4961                         spin_lock_irq(&device->resource->req_lock);
4962                         list_del(&peer_req->w.list);
4963                         spin_unlock_irq(&device->resource->req_lock);
4964
4965                         drbd_free_peer_req(device, peer_req);
4966                         put_ldev(device);
4967                         err = 0;
4968                         goto fail;
4969                 }
4970
4971                 inc_unacked(device);
4972
4973                 /* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
4974                    as well as drbd_rs_complete_io() */
4975         } else {
4976         fail:
4977                 drbd_rs_complete_io(device, sector);
4978                 drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
4979         }
4980
4981         atomic_add(size >> 9, &device->rs_sect_in);
4982
4983         return err;
4984 }
4985
4986 struct data_cmd {
4987         int expect_payload;
4988         unsigned int pkt_size;
4989         int (*fn)(struct drbd_connection *, struct packet_info *);
4990 };
4991
4992 static struct data_cmd drbd_cmd_handler[] = {
4993         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4994         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4995         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4996         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4997         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4998         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4999         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
5000         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
5001         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
5002         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
5003         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
5004         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
5005         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
5006         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
5007         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
5008         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
5009         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
5010         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
5011         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
5012         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
5013         [P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
5014         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
5015         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
5016         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
5017         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
5018         [P_TRIM]            = { 0, sizeof(struct p_trim), receive_Data },
5019         [P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
5020         [P_WSAME]           = { 1, sizeof(struct p_wsame), receive_Data },
5021 };
5022
5023 static void drbdd(struct drbd_connection *connection)
5024 {
5025         struct packet_info pi;
5026         size_t shs; /* sub header size */
5027         int err;
5028
5029         while (get_t_state(&connection->receiver) == RUNNING) {
5030                 struct data_cmd const *cmd;
5031
5032                 drbd_thread_current_set_cpu(&connection->receiver);
5033                 update_receiver_timing_details(connection, drbd_recv_header);
5034                 if (drbd_recv_header(connection, &pi))
5035                         goto err_out;
5036
5037                 cmd = &drbd_cmd_handler[pi.cmd];
5038                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
5039                         drbd_err(connection, "Unexpected data packet %s (0x%04x)",
5040                                  cmdname(pi.cmd), pi.cmd);
5041                         goto err_out;
5042                 }
5043
5044                 shs = cmd->pkt_size;
5045                 if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
5046                         shs += sizeof(struct o_qlim);
5047                 if (pi.size > shs && !cmd->expect_payload) {
5048                         drbd_err(connection, "No payload expected %s l:%d\n",
5049                                  cmdname(pi.cmd), pi.size);
5050                         goto err_out;
5051                 }
5052                 if (pi.size < shs) {
5053                         drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
5054                                  cmdname(pi.cmd), (int)shs, pi.size);
5055                         goto err_out;
5056                 }
5057
5058                 if (shs) {
5059                         update_receiver_timing_details(connection, drbd_recv_all_warn);
5060                         err = drbd_recv_all_warn(connection, pi.data, shs);
5061                         if (err)
5062                                 goto err_out;
5063                         pi.size -= shs;
5064                 }
5065
5066                 update_receiver_timing_details(connection, cmd->fn);
5067                 err = cmd->fn(connection, &pi);
5068                 if (err) {
5069                         drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
5070                                  cmdname(pi.cmd), err, pi.size);
5071                         goto err_out;
5072                 }
5073         }
5074         return;
5075
5076     err_out:
5077         conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
5078 }
5079
5080 static void conn_disconnect(struct drbd_connection *connection)
5081 {
5082         struct drbd_peer_device *peer_device;
5083         enum drbd_conns oc;
5084         int vnr;
5085
5086         if (connection->cstate == C_STANDALONE)
5087                 return;
5088
5089         /* We are about to start the cleanup after connection loss.
5090          * Make sure drbd_make_request knows about that.
5091          * Usually we should be in some network failure state already,
5092          * but just in case we are not, we fix it up here.
5093          */
5094         conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5095
5096         /* ack_receiver does not clean up anything. it must not interfere, either */
5097         drbd_thread_stop(&connection->ack_receiver);
5098         if (connection->ack_sender) {
5099                 destroy_workqueue(connection->ack_sender);
5100                 connection->ack_sender = NULL;
5101         }
5102         drbd_free_sock(connection);
5103
5104         rcu_read_lock();
5105         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5106                 struct drbd_device *device = peer_device->device;
5107                 kref_get(&device->kref);
5108                 rcu_read_unlock();
5109                 drbd_disconnected(peer_device);
5110                 kref_put(&device->kref, drbd_destroy_device);
5111                 rcu_read_lock();
5112         }
5113         rcu_read_unlock();
5114
5115         if (!list_empty(&connection->current_epoch->list))
5116                 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
5117         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
5118         atomic_set(&connection->current_epoch->epoch_size, 0);
5119         connection->send.seen_any_write_yet = false;
5120
5121         drbd_info(connection, "Connection closed\n");
5122
5123         if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
5124                 conn_try_outdate_peer_async(connection);
5125
5126         spin_lock_irq(&connection->resource->req_lock);
5127         oc = connection->cstate;
5128         if (oc >= C_UNCONNECTED)
5129                 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
5130
5131         spin_unlock_irq(&connection->resource->req_lock);
5132
5133         if (oc == C_DISCONNECTING)
5134                 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
5135 }
5136
5137 static int drbd_disconnected(struct drbd_peer_device *peer_device)
5138 {
5139         struct drbd_device *device = peer_device->device;
5140         unsigned int i;
5141
5142         /* wait for current activity to cease. */
5143         spin_lock_irq(&device->resource->req_lock);
5144         _drbd_wait_ee_list_empty(device, &device->active_ee);
5145         _drbd_wait_ee_list_empty(device, &device->sync_ee);
5146         _drbd_wait_ee_list_empty(device, &device->read_ee);
5147         spin_unlock_irq(&device->resource->req_lock);
5148
5149         /* We do not have data structures that would allow us to
5150          * get the rs_pending_cnt down to 0 again.
5151          *  * On C_SYNC_TARGET we do not have any data structures describing
5152          *    the pending RSDataRequest's we have sent.
5153          *  * On C_SYNC_SOURCE there is no data structure that tracks
5154          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5155          *  And no, it is not the sum of the reference counts in the
5156          *  resync_LRU. The resync_LRU tracks the whole operation including
5157          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
5158          *  on the fly. */
5159         drbd_rs_cancel_all(device);
5160         device->rs_total = 0;
5161         device->rs_failed = 0;
5162         atomic_set(&device->rs_pending_cnt, 0);
5163         wake_up(&device->misc_wait);
5164
5165         del_timer_sync(&device->resync_timer);
5166         resync_timer_fn((unsigned long)device);
5167
5168         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5169          * w_make_resync_request etc. which may still be on the worker queue
5170          * to be "canceled" */
5171         drbd_flush_workqueue(&peer_device->connection->sender_work);
5172
5173         drbd_finish_peer_reqs(device);
5174
5175         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5176            might have issued a work again. The one before drbd_finish_peer_reqs() is
5177            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5178         drbd_flush_workqueue(&peer_device->connection->sender_work);
5179
5180         /* need to do it again, drbd_finish_peer_reqs() may have populated it
5181          * again via drbd_try_clear_on_disk_bm(). */
5182         drbd_rs_cancel_all(device);
5183
5184         kfree(device->p_uuid);
5185         device->p_uuid = NULL;
5186
5187         if (!drbd_suspended(device))
5188                 tl_clear(peer_device->connection);
5189
5190         drbd_md_sync(device);
5191
5192         if (get_ldev(device)) {
5193                 drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5194                                 "write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5195                 put_ldev(device);
5196         }
5197
5198         /* tcp_close and release of sendpage pages can be deferred.  I don't
5199          * want to use SO_LINGER, because apparently it can be deferred for
5200          * more than 20 seconds (longest time I checked).
5201          *
5202          * Actually we don't care for exactly when the network stack does its
5203          * put_page(), but release our reference on these pages right here.
5204          */
5205         i = drbd_free_peer_reqs(device, &device->net_ee);
5206         if (i)
5207                 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5208         i = atomic_read(&device->pp_in_use_by_net);
5209         if (i)
5210                 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5211         i = atomic_read(&device->pp_in_use);
5212         if (i)
5213                 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5214
5215         D_ASSERT(device, list_empty(&device->read_ee));
5216         D_ASSERT(device, list_empty(&device->active_ee));
5217         D_ASSERT(device, list_empty(&device->sync_ee));
5218         D_ASSERT(device, list_empty(&device->done_ee));
5219
5220         return 0;
5221 }
5222
5223 /*
5224  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5225  * we can agree on is stored in agreed_pro_version.
5226  *
5227  * feature flags and the reserved array should be enough room for future
5228  * enhancements of the handshake protocol, and possible plugins...
5229  *
5230  * for now, they are expected to be zero, but ignored.
5231  */
5232 static int drbd_send_features(struct drbd_connection *connection)
5233 {
5234         struct drbd_socket *sock;
5235         struct p_connection_features *p;
5236
5237         sock = &connection->data;
5238         p = conn_prepare_command(connection, sock);
5239         if (!p)
5240                 return -EIO;
5241         memset(p, 0, sizeof(*p));
5242         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5243         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5244         p->feature_flags = cpu_to_be32(PRO_FEATURES);
5245         return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5246 }
5247
5248 /*
5249  * return values:
5250  *   1 yes, we have a valid connection
5251  *   0 oops, did not work out, please try again
5252  *  -1 peer talks different language,
5253  *     no point in trying again, please go standalone.
5254  */
5255 static int drbd_do_features(struct drbd_connection *connection)
5256 {
5257         /* ASSERT current == connection->receiver ... */
5258         struct p_connection_features *p;
5259         const int expect = sizeof(struct p_connection_features);
5260         struct packet_info pi;
5261         int err;
5262
5263         err = drbd_send_features(connection);
5264         if (err)
5265                 return 0;
5266
5267         err = drbd_recv_header(connection, &pi);
5268         if (err)
5269                 return 0;
5270
5271         if (pi.cmd != P_CONNECTION_FEATURES) {
5272                 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5273                          cmdname(pi.cmd), pi.cmd);
5274                 return -1;
5275         }
5276
5277         if (pi.size != expect) {
5278                 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5279                      expect, pi.size);
5280                 return -1;
5281         }
5282
5283         p = pi.data;
5284         err = drbd_recv_all_warn(connection, p, expect);
5285         if (err)
5286                 return 0;
5287
5288         p->protocol_min = be32_to_cpu(p->protocol_min);
5289         p->protocol_max = be32_to_cpu(p->protocol_max);
5290         if (p->protocol_max == 0)
5291                 p->protocol_max = p->protocol_min;
5292
5293         if (PRO_VERSION_MAX < p->protocol_min ||
5294             PRO_VERSION_MIN > p->protocol_max)
5295                 goto incompat;
5296
5297         connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5298         connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5299
5300         drbd_info(connection, "Handshake successful: "
5301              "Agreed network protocol version %d\n", connection->agreed_pro_version);
5302
5303         drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s.\n",
5304                   connection->agreed_features,
5305                   connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5306                   connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5307                   connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" :
5308                   connection->agreed_features ? "" : " none");
5309
5310         return 1;
5311
5312  incompat:
5313         drbd_err(connection, "incompatible DRBD dialects: "
5314             "I support %d-%d, peer supports %d-%d\n",
5315             PRO_VERSION_MIN, PRO_VERSION_MAX,
5316             p->protocol_min, p->protocol_max);
5317         return -1;
5318 }
5319
5320 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5321 static int drbd_do_auth(struct drbd_connection *connection)
5322 {
5323         drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5324         drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5325         return -1;
5326 }
5327 #else
5328 #define CHALLENGE_LEN 64
5329
5330 /* Return value:
5331         1 - auth succeeded,
5332         0 - failed, try again (network error),
5333         -1 - auth failed, don't try again.
5334 */
5335
5336 static int drbd_do_auth(struct drbd_connection *connection)
5337 {
5338         struct drbd_socket *sock;
5339         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5340         char *response = NULL;
5341         char *right_response = NULL;
5342         char *peers_ch = NULL;
5343         unsigned int key_len;
5344         char secret[SHARED_SECRET_MAX]; /* 64 byte */
5345         unsigned int resp_size;
5346         struct shash_desc *desc;
5347         struct packet_info pi;
5348         struct net_conf *nc;
5349         int err, rv;
5350
5351         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5352
5353         rcu_read_lock();
5354         nc = rcu_dereference(connection->net_conf);
5355         key_len = strlen(nc->shared_secret);
5356         memcpy(secret, nc->shared_secret, key_len);
5357         rcu_read_unlock();
5358
5359         desc = kmalloc(sizeof(struct shash_desc) +
5360                        crypto_shash_descsize(connection->cram_hmac_tfm),
5361                        GFP_KERNEL);
5362         if (!desc) {
5363                 rv = -1;
5364                 goto fail;
5365         }
5366         desc->tfm = connection->cram_hmac_tfm;
5367         desc->flags = 0;
5368
5369         rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5370         if (rv) {
5371                 drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5372                 rv = -1;
5373                 goto fail;
5374         }
5375
5376         get_random_bytes(my_challenge, CHALLENGE_LEN);
5377
5378         sock = &connection->data;
5379         if (!conn_prepare_command(connection, sock)) {
5380                 rv = 0;
5381                 goto fail;
5382         }
5383         rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5384                                 my_challenge, CHALLENGE_LEN);
5385         if (!rv)
5386                 goto fail;
5387
5388         err = drbd_recv_header(connection, &pi);
5389         if (err) {
5390                 rv = 0;
5391                 goto fail;
5392         }
5393
5394         if (pi.cmd != P_AUTH_CHALLENGE) {
5395                 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5396                          cmdname(pi.cmd), pi.cmd);
5397                 rv = 0;
5398                 goto fail;
5399         }
5400
5401         if (pi.size > CHALLENGE_LEN * 2) {
5402                 drbd_err(connection, "expected AuthChallenge payload too big.\n");
5403                 rv = -1;
5404                 goto fail;
5405         }
5406
5407         if (pi.size < CHALLENGE_LEN) {
5408                 drbd_err(connection, "AuthChallenge payload too small.\n");
5409                 rv = -1;
5410                 goto fail;
5411         }
5412
5413         peers_ch = kmalloc(pi.size, GFP_NOIO);
5414         if (peers_ch == NULL) {
5415                 drbd_err(connection, "kmalloc of peers_ch failed\n");
5416                 rv = -1;
5417                 goto fail;
5418         }
5419
5420         err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5421         if (err) {
5422                 rv = 0;
5423                 goto fail;
5424         }
5425
5426         if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5427                 drbd_err(connection, "Peer presented the same challenge!\n");
5428                 rv = -1;
5429                 goto fail;
5430         }
5431
5432         resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5433         response = kmalloc(resp_size, GFP_NOIO);
5434         if (response == NULL) {
5435                 drbd_err(connection, "kmalloc of response failed\n");
5436                 rv = -1;
5437                 goto fail;
5438         }
5439
5440         rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5441         if (rv) {
5442                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5443                 rv = -1;
5444                 goto fail;
5445         }
5446
5447         if (!conn_prepare_command(connection, sock)) {
5448                 rv = 0;
5449                 goto fail;
5450         }
5451         rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5452                                 response, resp_size);
5453         if (!rv)
5454                 goto fail;
5455
5456         err = drbd_recv_header(connection, &pi);
5457         if (err) {
5458                 rv = 0;
5459                 goto fail;
5460         }
5461
5462         if (pi.cmd != P_AUTH_RESPONSE) {
5463                 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5464                          cmdname(pi.cmd), pi.cmd);
5465                 rv = 0;
5466                 goto fail;
5467         }
5468
5469         if (pi.size != resp_size) {
5470                 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5471                 rv = 0;
5472                 goto fail;
5473         }
5474
5475         err = drbd_recv_all_warn(connection, response , resp_size);
5476         if (err) {
5477                 rv = 0;
5478                 goto fail;
5479         }
5480
5481         right_response = kmalloc(resp_size, GFP_NOIO);
5482         if (right_response == NULL) {
5483                 drbd_err(connection, "kmalloc of right_response failed\n");
5484                 rv = -1;
5485                 goto fail;
5486         }
5487
5488         rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5489                                  right_response);
5490         if (rv) {
5491                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5492                 rv = -1;
5493                 goto fail;
5494         }
5495
5496         rv = !memcmp(response, right_response, resp_size);
5497
5498         if (rv)
5499                 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5500                      resp_size);
5501         else
5502                 rv = -1;
5503
5504  fail:
5505         kfree(peers_ch);
5506         kfree(response);
5507         kfree(right_response);
5508         if (desc) {
5509                 shash_desc_zero(desc);
5510                 kfree(desc);
5511         }
5512
5513         return rv;
5514 }
5515 #endif
5516
5517 int drbd_receiver(struct drbd_thread *thi)
5518 {
5519         struct drbd_connection *connection = thi->connection;
5520         int h;
5521
5522         drbd_info(connection, "receiver (re)started\n");
5523
5524         do {
5525                 h = conn_connect(connection);
5526                 if (h == 0) {
5527                         conn_disconnect(connection);
5528                         schedule_timeout_interruptible(HZ);
5529                 }
5530                 if (h == -1) {
5531                         drbd_warn(connection, "Discarding network configuration.\n");
5532                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5533                 }
5534         } while (h == 0);
5535
5536         if (h > 0)
5537                 drbdd(connection);
5538
5539         conn_disconnect(connection);
5540
5541         drbd_info(connection, "receiver terminated\n");
5542         return 0;
5543 }
5544
5545 /* ********* acknowledge sender ******** */
5546
5547 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5548 {
5549         struct p_req_state_reply *p = pi->data;
5550         int retcode = be32_to_cpu(p->retcode);
5551
5552         if (retcode >= SS_SUCCESS) {
5553                 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5554         } else {
5555                 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5556                 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5557                          drbd_set_st_err_str(retcode), retcode);
5558         }
5559         wake_up(&connection->ping_wait);
5560
5561         return 0;
5562 }
5563
5564 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5565 {
5566         struct drbd_peer_device *peer_device;
5567         struct drbd_device *device;
5568         struct p_req_state_reply *p = pi->data;
5569         int retcode = be32_to_cpu(p->retcode);
5570
5571         peer_device = conn_peer_device(connection, pi->vnr);
5572         if (!peer_device)
5573                 return -EIO;
5574         device = peer_device->device;
5575
5576         if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5577                 D_ASSERT(device, connection->agreed_pro_version < 100);
5578                 return got_conn_RqSReply(connection, pi);
5579         }
5580
5581         if (retcode >= SS_SUCCESS) {
5582                 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5583         } else {
5584                 set_bit(CL_ST_CHG_FAIL, &device->flags);
5585                 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5586                         drbd_set_st_err_str(retcode), retcode);
5587         }
5588         wake_up(&device->state_wait);
5589
5590         return 0;
5591 }
5592
5593 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5594 {
5595         return drbd_send_ping_ack(connection);
5596
5597 }
5598
5599 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5600 {
5601         /* restore idle timeout */
5602         connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5603         if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5604                 wake_up(&connection->ping_wait);
5605
5606         return 0;
5607 }
5608
5609 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5610 {
5611         struct drbd_peer_device *peer_device;
5612         struct drbd_device *device;
5613         struct p_block_ack *p = pi->data;
5614         sector_t sector = be64_to_cpu(p->sector);
5615         int blksize = be32_to_cpu(p->blksize);
5616
5617         peer_device = conn_peer_device(connection, pi->vnr);
5618         if (!peer_device)
5619                 return -EIO;
5620         device = peer_device->device;
5621
5622         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5623
5624         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5625
5626         if (get_ldev(device)) {
5627                 drbd_rs_complete_io(device, sector);
5628                 drbd_set_in_sync(device, sector, blksize);
5629                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5630                 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5631                 put_ldev(device);
5632         }
5633         dec_rs_pending(device);
5634         atomic_add(blksize >> 9, &device->rs_sect_in);
5635
5636         return 0;
5637 }
5638
5639 static int
5640 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5641                               struct rb_root *root, const char *func,
5642                               enum drbd_req_event what, bool missing_ok)
5643 {
5644         struct drbd_request *req;
5645         struct bio_and_error m;
5646
5647         spin_lock_irq(&device->resource->req_lock);
5648         req = find_request(device, root, id, sector, missing_ok, func);
5649         if (unlikely(!req)) {
5650                 spin_unlock_irq(&device->resource->req_lock);
5651                 return -EIO;
5652         }
5653         __req_mod(req, what, &m);
5654         spin_unlock_irq(&device->resource->req_lock);
5655
5656         if (m.bio)
5657                 complete_master_bio(device, &m);
5658         return 0;
5659 }
5660
5661 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5662 {
5663         struct drbd_peer_device *peer_device;
5664         struct drbd_device *device;
5665         struct p_block_ack *p = pi->data;
5666         sector_t sector = be64_to_cpu(p->sector);
5667         int blksize = be32_to_cpu(p->blksize);
5668         enum drbd_req_event what;
5669
5670         peer_device = conn_peer_device(connection, pi->vnr);
5671         if (!peer_device)
5672                 return -EIO;
5673         device = peer_device->device;
5674
5675         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5676
5677         if (p->block_id == ID_SYNCER) {
5678                 drbd_set_in_sync(device, sector, blksize);
5679                 dec_rs_pending(device);
5680                 return 0;
5681         }
5682         switch (pi->cmd) {
5683         case P_RS_WRITE_ACK:
5684                 what = WRITE_ACKED_BY_PEER_AND_SIS;
5685                 break;
5686         case P_WRITE_ACK:
5687                 what = WRITE_ACKED_BY_PEER;
5688                 break;
5689         case P_RECV_ACK:
5690                 what = RECV_ACKED_BY_PEER;
5691                 break;
5692         case P_SUPERSEDED:
5693                 what = CONFLICT_RESOLVED;
5694                 break;
5695         case P_RETRY_WRITE:
5696                 what = POSTPONE_WRITE;
5697                 break;
5698         default:
5699                 BUG();
5700         }
5701
5702         return validate_req_change_req_state(device, p->block_id, sector,
5703                                              &device->write_requests, __func__,
5704                                              what, false);
5705 }
5706
5707 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5708 {
5709         struct drbd_peer_device *peer_device;
5710         struct drbd_device *device;
5711         struct p_block_ack *p = pi->data;
5712         sector_t sector = be64_to_cpu(p->sector);
5713         int size = be32_to_cpu(p->blksize);
5714         int err;
5715
5716         peer_device = conn_peer_device(connection, pi->vnr);
5717         if (!peer_device)
5718                 return -EIO;
5719         device = peer_device->device;
5720
5721         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5722
5723         if (p->block_id == ID_SYNCER) {
5724                 dec_rs_pending(device);
5725                 drbd_rs_failed_io(device, sector, size);
5726                 return 0;
5727         }
5728
5729         err = validate_req_change_req_state(device, p->block_id, sector,
5730                                             &device->write_requests, __func__,
5731                                             NEG_ACKED, true);
5732         if (err) {
5733                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5734                    The master bio might already be completed, therefore the
5735                    request is no longer in the collision hash. */
5736                 /* In Protocol B we might already have got a P_RECV_ACK
5737                    but then get a P_NEG_ACK afterwards. */
5738                 drbd_set_out_of_sync(device, sector, size);
5739         }
5740         return 0;
5741 }
5742
5743 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5744 {
5745         struct drbd_peer_device *peer_device;
5746         struct drbd_device *device;
5747         struct p_block_ack *p = pi->data;
5748         sector_t sector = be64_to_cpu(p->sector);
5749
5750         peer_device = conn_peer_device(connection, pi->vnr);
5751         if (!peer_device)
5752                 return -EIO;
5753         device = peer_device->device;
5754
5755         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5756
5757         drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5758             (unsigned long long)sector, be32_to_cpu(p->blksize));
5759
5760         return validate_req_change_req_state(device, p->block_id, sector,
5761                                              &device->read_requests, __func__,
5762                                              NEG_ACKED, false);
5763 }
5764
5765 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5766 {
5767         struct drbd_peer_device *peer_device;
5768         struct drbd_device *device;
5769         sector_t sector;
5770         int size;
5771         struct p_block_ack *p = pi->data;
5772
5773         peer_device = conn_peer_device(connection, pi->vnr);
5774         if (!peer_device)
5775                 return -EIO;
5776         device = peer_device->device;
5777
5778         sector = be64_to_cpu(p->sector);
5779         size = be32_to_cpu(p->blksize);
5780
5781         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5782
5783         dec_rs_pending(device);
5784
5785         if (get_ldev_if_state(device, D_FAILED)) {
5786                 drbd_rs_complete_io(device, sector);
5787                 switch (pi->cmd) {
5788                 case P_NEG_RS_DREPLY:
5789                         drbd_rs_failed_io(device, sector, size);
5790                 case P_RS_CANCEL:
5791                         break;
5792                 default:
5793                         BUG();
5794                 }
5795                 put_ldev(device);
5796         }
5797
5798         return 0;
5799 }
5800
5801 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5802 {
5803         struct p_barrier_ack *p = pi->data;
5804         struct drbd_peer_device *peer_device;
5805         int vnr;
5806
5807         tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5808
5809         rcu_read_lock();
5810         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5811                 struct drbd_device *device = peer_device->device;
5812
5813                 if (device->state.conn == C_AHEAD &&
5814                     atomic_read(&device->ap_in_flight) == 0 &&
5815                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5816                         device->start_resync_timer.expires = jiffies + HZ;
5817                         add_timer(&device->start_resync_timer);
5818                 }
5819         }
5820         rcu_read_unlock();
5821
5822         return 0;
5823 }
5824
5825 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5826 {
5827         struct drbd_peer_device *peer_device;
5828         struct drbd_device *device;
5829         struct p_block_ack *p = pi->data;
5830         struct drbd_device_work *dw;
5831         sector_t sector;
5832         int size;
5833
5834         peer_device = conn_peer_device(connection, pi->vnr);
5835         if (!peer_device)
5836                 return -EIO;
5837         device = peer_device->device;
5838
5839         sector = be64_to_cpu(p->sector);
5840         size = be32_to_cpu(p->blksize);
5841
5842         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5843
5844         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5845                 drbd_ov_out_of_sync_found(device, sector, size);
5846         else
5847                 ov_out_of_sync_print(device);
5848
5849         if (!get_ldev(device))
5850                 return 0;
5851
5852         drbd_rs_complete_io(device, sector);
5853         dec_rs_pending(device);
5854
5855         --device->ov_left;
5856
5857         /* let's advance progress step marks only for every other megabyte */
5858         if ((device->ov_left & 0x200) == 0x200)
5859                 drbd_advance_rs_marks(device, device->ov_left);
5860
5861         if (device->ov_left == 0) {
5862                 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5863                 if (dw) {
5864                         dw->w.cb = w_ov_finished;
5865                         dw->device = device;
5866                         drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5867                 } else {
5868                         drbd_err(device, "kmalloc(dw) failed.");
5869                         ov_out_of_sync_print(device);
5870                         drbd_resync_finished(device);
5871                 }
5872         }
5873         put_ldev(device);
5874         return 0;
5875 }
5876
5877 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5878 {
5879         return 0;
5880 }
5881
5882 struct meta_sock_cmd {
5883         size_t pkt_size;
5884         int (*fn)(struct drbd_connection *connection, struct packet_info *);
5885 };
5886
5887 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5888 {
5889         long t;
5890         struct net_conf *nc;
5891
5892         rcu_read_lock();
5893         nc = rcu_dereference(connection->net_conf);
5894         t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5895         rcu_read_unlock();
5896
5897         t *= HZ;
5898         if (ping_timeout)
5899                 t /= 10;
5900
5901         connection->meta.socket->sk->sk_rcvtimeo = t;
5902 }
5903
5904 static void set_ping_timeout(struct drbd_connection *connection)
5905 {
5906         set_rcvtimeo(connection, 1);
5907 }
5908
5909 static void set_idle_timeout(struct drbd_connection *connection)
5910 {
5911         set_rcvtimeo(connection, 0);
5912 }
5913
5914 static struct meta_sock_cmd ack_receiver_tbl[] = {
5915         [P_PING]            = { 0, got_Ping },
5916         [P_PING_ACK]        = { 0, got_PingAck },
5917         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5918         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5919         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5920         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5921         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5922         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5923         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5924         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5925         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5926         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5927         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5928         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5929         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5930         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5931         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5932 };
5933
5934 int drbd_ack_receiver(struct drbd_thread *thi)
5935 {
5936         struct drbd_connection *connection = thi->connection;
5937         struct meta_sock_cmd *cmd = NULL;
5938         struct packet_info pi;
5939         unsigned long pre_recv_jif;
5940         int rv;
5941         void *buf    = connection->meta.rbuf;
5942         int received = 0;
5943         unsigned int header_size = drbd_header_size(connection);
5944         int expect   = header_size;
5945         bool ping_timeout_active = false;
5946         struct sched_param param = { .sched_priority = 2 };
5947
5948         rv = sched_setscheduler(current, SCHED_RR, &param);
5949         if (rv < 0)
5950                 drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
5951
5952         while (get_t_state(thi) == RUNNING) {
5953                 drbd_thread_current_set_cpu(thi);
5954
5955                 conn_reclaim_net_peer_reqs(connection);
5956
5957                 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5958                         if (drbd_send_ping(connection)) {
5959                                 drbd_err(connection, "drbd_send_ping has failed\n");
5960                                 goto reconnect;
5961                         }
5962                         set_ping_timeout(connection);
5963                         ping_timeout_active = true;
5964                 }
5965
5966                 pre_recv_jif = jiffies;
5967                 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5968
5969                 /* Note:
5970                  * -EINTR        (on meta) we got a signal
5971                  * -EAGAIN       (on meta) rcvtimeo expired
5972                  * -ECONNRESET   other side closed the connection
5973                  * -ERESTARTSYS  (on data) we got a signal
5974                  * rv <  0       other than above: unexpected error!
5975                  * rv == expected: full header or command
5976                  * rv <  expected: "woken" by signal during receive
5977                  * rv == 0       : "connection shut down by peer"
5978                  */
5979                 if (likely(rv > 0)) {
5980                         received += rv;
5981                         buf      += rv;
5982                 } else if (rv == 0) {
5983                         if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5984                                 long t;
5985                                 rcu_read_lock();
5986                                 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5987                                 rcu_read_unlock();
5988
5989                                 t = wait_event_timeout(connection->ping_wait,
5990                                                        connection->cstate < C_WF_REPORT_PARAMS,
5991                                                        t);
5992                                 if (t)
5993                                         break;
5994                         }
5995                         drbd_err(connection, "meta connection shut down by peer.\n");
5996                         goto reconnect;
5997                 } else if (rv == -EAGAIN) {
5998                         /* If the data socket received something meanwhile,
5999                          * that is good enough: peer is still alive. */
6000                         if (time_after(connection->last_received, pre_recv_jif))
6001                                 continue;
6002                         if (ping_timeout_active) {
6003                                 drbd_err(connection, "PingAck did not arrive in time.\n");
6004                                 goto reconnect;
6005                         }
6006                         set_bit(SEND_PING, &connection->flags);
6007                         continue;
6008                 } else if (rv == -EINTR) {
6009                         /* maybe drbd_thread_stop(): the while condition will notice.
6010                          * maybe woken for send_ping: we'll send a ping above,
6011                          * and change the rcvtimeo */
6012                         flush_signals(current);
6013                         continue;
6014                 } else {
6015                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
6016                         goto reconnect;
6017                 }
6018
6019                 if (received == expect && cmd == NULL) {
6020                         if (decode_header(connection, connection->meta.rbuf, &pi))
6021                                 goto reconnect;
6022                         cmd = &ack_receiver_tbl[pi.cmd];
6023                         if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
6024                                 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
6025                                          cmdname(pi.cmd), pi.cmd);
6026                                 goto disconnect;
6027                         }
6028                         expect = header_size + cmd->pkt_size;
6029                         if (pi.size != expect - header_size) {
6030                                 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
6031                                         pi.cmd, pi.size);
6032                                 goto reconnect;
6033                         }
6034                 }
6035                 if (received == expect) {
6036                         bool err;
6037
6038                         err = cmd->fn(connection, &pi);
6039                         if (err) {
6040                                 drbd_err(connection, "%pf failed\n", cmd->fn);
6041                                 goto reconnect;
6042                         }
6043
6044                         connection->last_received = jiffies;
6045
6046                         if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
6047                                 set_idle_timeout(connection);
6048                                 ping_timeout_active = false;
6049                         }
6050
6051                         buf      = connection->meta.rbuf;
6052                         received = 0;
6053                         expect   = header_size;
6054                         cmd      = NULL;
6055                 }
6056         }
6057
6058         if (0) {
6059 reconnect:
6060                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6061                 conn_md_sync(connection);
6062         }
6063         if (0) {
6064 disconnect:
6065                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
6066         }
6067
6068         drbd_info(connection, "ack_receiver terminated\n");
6069
6070         return 0;
6071 }
6072
6073 void drbd_send_acks_wf(struct work_struct *ws)
6074 {
6075         struct drbd_peer_device *peer_device =
6076                 container_of(ws, struct drbd_peer_device, send_acks_work);
6077         struct drbd_connection *connection = peer_device->connection;
6078         struct drbd_device *device = peer_device->device;
6079         struct net_conf *nc;
6080         int tcp_cork, err;
6081
6082         rcu_read_lock();
6083         nc = rcu_dereference(connection->net_conf);
6084         tcp_cork = nc->tcp_cork;
6085         rcu_read_unlock();
6086
6087         if (tcp_cork)
6088                 drbd_tcp_cork(connection->meta.socket);
6089
6090         err = drbd_finish_peer_reqs(device);
6091         kref_put(&device->kref, drbd_destroy_device);
6092         /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
6093            struct work_struct send_acks_work alive, which is in the peer_device object */
6094
6095         if (err) {
6096                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6097                 return;
6098         }
6099
6100         if (tcp_cork)
6101                 drbd_tcp_uncork(connection->meta.socket);
6102
6103         return;
6104 }