Merge tag 'chrome-platform-for-linus-4.13' of git://git.kernel.org/pub/scm/linux...
[linux/fpc-iii.git] / drivers / block / drbd / drbd_receiver.c
blobc7e95e6380fb46d1503c6a595f6cb716c4d0f6ef
1 /*
2 drbd_receiver.c
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
28 #include <linux/uaccess.h>
29 #include <net/sock.h>
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <uapi/linux/sched/types.h>
40 #include <linux/sched/signal.h>
41 #include <linux/pkt_sched.h>
42 #define __KERNEL_SYSCALLS__
43 #include <linux/unistd.h>
44 #include <linux/vmalloc.h>
45 #include <linux/random.h>
46 #include <linux/string.h>
47 #include <linux/scatterlist.h>
48 #include "drbd_int.h"
49 #include "drbd_protocol.h"
50 #include "drbd_req.h"
51 #include "drbd_vli.h"
53 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME)
55 struct packet_info {
56 enum drbd_packet cmd;
57 unsigned int size;
58 unsigned int vnr;
59 void *data;
62 enum finish_epoch {
63 FE_STILL_LIVE,
64 FE_DESTROYED,
65 FE_RECYCLED,
68 static int drbd_do_features(struct drbd_connection *connection);
69 static int drbd_do_auth(struct drbd_connection *connection);
70 static int drbd_disconnected(struct drbd_peer_device *);
71 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
72 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
73 static int e_end_block(struct drbd_work *, int);
76 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
79 * some helper functions to deal with single linked page lists,
80 * page->private being our "next" pointer.
83 /* If at least n pages are linked at head, get n pages off.
84 * Otherwise, don't modify head, and return NULL.
85 * Locking is the responsibility of the caller.
87 static struct page *page_chain_del(struct page **head, int n)
89 struct page *page;
90 struct page *tmp;
92 BUG_ON(!n);
93 BUG_ON(!head);
95 page = *head;
97 if (!page)
98 return NULL;
100 while (page) {
101 tmp = page_chain_next(page);
102 if (--n == 0)
103 break; /* found sufficient pages */
104 if (tmp == NULL)
105 /* insufficient pages, don't use any of them. */
106 return NULL;
107 page = tmp;
110 /* add end of list marker for the returned list */
111 set_page_private(page, 0);
112 /* actual return value, and adjustment of head */
113 page = *head;
114 *head = tmp;
115 return page;
118 /* may be used outside of locks to find the tail of a (usually short)
119 * "private" page chain, before adding it back to a global chain head
120 * with page_chain_add() under a spinlock. */
121 static struct page *page_chain_tail(struct page *page, int *len)
123 struct page *tmp;
124 int i = 1;
125 while ((tmp = page_chain_next(page)))
126 ++i, page = tmp;
127 if (len)
128 *len = i;
129 return page;
132 static int page_chain_free(struct page *page)
134 struct page *tmp;
135 int i = 0;
136 page_chain_for_each_safe(page, tmp) {
137 put_page(page);
138 ++i;
140 return i;
143 static void page_chain_add(struct page **head,
144 struct page *chain_first, struct page *chain_last)
146 #if 1
147 struct page *tmp;
148 tmp = page_chain_tail(chain_first, NULL);
149 BUG_ON(tmp != chain_last);
150 #endif
152 /* add chain to head */
153 set_page_private(chain_last, (unsigned long)*head);
154 *head = chain_first;
157 static struct page *__drbd_alloc_pages(struct drbd_device *device,
158 unsigned int number)
160 struct page *page = NULL;
161 struct page *tmp = NULL;
162 unsigned int i = 0;
164 /* Yes, testing drbd_pp_vacant outside the lock is racy.
165 * So what. It saves a spin_lock. */
166 if (drbd_pp_vacant >= number) {
167 spin_lock(&drbd_pp_lock);
168 page = page_chain_del(&drbd_pp_pool, number);
169 if (page)
170 drbd_pp_vacant -= number;
171 spin_unlock(&drbd_pp_lock);
172 if (page)
173 return page;
176 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
177 * "criss-cross" setup, that might cause write-out on some other DRBD,
178 * which in turn might block on the other node at this very place. */
179 for (i = 0; i < number; i++) {
180 tmp = alloc_page(GFP_TRY);
181 if (!tmp)
182 break;
183 set_page_private(tmp, (unsigned long)page);
184 page = tmp;
187 if (i == number)
188 return page;
190 /* Not enough pages immediately available this time.
191 * No need to jump around here, drbd_alloc_pages will retry this
192 * function "soon". */
193 if (page) {
194 tmp = page_chain_tail(page, NULL);
195 spin_lock(&drbd_pp_lock);
196 page_chain_add(&drbd_pp_pool, page, tmp);
197 drbd_pp_vacant += i;
198 spin_unlock(&drbd_pp_lock);
200 return NULL;
203 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
204 struct list_head *to_be_freed)
206 struct drbd_peer_request *peer_req, *tmp;
208 /* The EEs are always appended to the end of the list. Since
209 they are sent in order over the wire, they have to finish
210 in order. As soon as we see the first not finished we can
211 stop to examine the list... */
213 list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
214 if (drbd_peer_req_has_active_page(peer_req))
215 break;
216 list_move(&peer_req->w.list, to_be_freed);
220 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
222 LIST_HEAD(reclaimed);
223 struct drbd_peer_request *peer_req, *t;
225 spin_lock_irq(&device->resource->req_lock);
226 reclaim_finished_net_peer_reqs(device, &reclaimed);
227 spin_unlock_irq(&device->resource->req_lock);
228 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
229 drbd_free_net_peer_req(device, peer_req);
232 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
234 struct drbd_peer_device *peer_device;
235 int vnr;
237 rcu_read_lock();
238 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
239 struct drbd_device *device = peer_device->device;
240 if (!atomic_read(&device->pp_in_use_by_net))
241 continue;
243 kref_get(&device->kref);
244 rcu_read_unlock();
245 drbd_reclaim_net_peer_reqs(device);
246 kref_put(&device->kref, drbd_destroy_device);
247 rcu_read_lock();
249 rcu_read_unlock();
253 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
254 * @device: DRBD device.
255 * @number: number of pages requested
256 * @retry: whether to retry, if not enough pages are available right now
258 * Tries to allocate number pages, first from our own page pool, then from
259 * the kernel.
260 * Possibly retry until DRBD frees sufficient pages somewhere else.
262 * If this allocation would exceed the max_buffers setting, we throttle
263 * allocation (schedule_timeout) to give the system some room to breathe.
265 * We do not use max-buffers as hard limit, because it could lead to
266 * congestion and further to a distributed deadlock during online-verify or
267 * (checksum based) resync, if the max-buffers, socket buffer sizes and
268 * resync-rate settings are mis-configured.
270 * Returns a page chain linked via page->private.
272 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
273 bool retry)
275 struct drbd_device *device = peer_device->device;
276 struct page *page = NULL;
277 struct net_conf *nc;
278 DEFINE_WAIT(wait);
279 unsigned int mxb;
281 rcu_read_lock();
282 nc = rcu_dereference(peer_device->connection->net_conf);
283 mxb = nc ? nc->max_buffers : 1000000;
284 rcu_read_unlock();
286 if (atomic_read(&device->pp_in_use) < mxb)
287 page = __drbd_alloc_pages(device, number);
289 /* Try to keep the fast path fast, but occasionally we need
290 * to reclaim the pages we lended to the network stack. */
291 if (page && atomic_read(&device->pp_in_use_by_net) > 512)
292 drbd_reclaim_net_peer_reqs(device);
294 while (page == NULL) {
295 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
297 drbd_reclaim_net_peer_reqs(device);
299 if (atomic_read(&device->pp_in_use) < mxb) {
300 page = __drbd_alloc_pages(device, number);
301 if (page)
302 break;
305 if (!retry)
306 break;
308 if (signal_pending(current)) {
309 drbd_warn(device, "drbd_alloc_pages interrupted!\n");
310 break;
313 if (schedule_timeout(HZ/10) == 0)
314 mxb = UINT_MAX;
316 finish_wait(&drbd_pp_wait, &wait);
318 if (page)
319 atomic_add(number, &device->pp_in_use);
320 return page;
323 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
324 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
325 * Either links the page chain back to the global pool,
326 * or returns all pages to the system. */
327 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
329 atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
330 int i;
332 if (page == NULL)
333 return;
335 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
336 i = page_chain_free(page);
337 else {
338 struct page *tmp;
339 tmp = page_chain_tail(page, &i);
340 spin_lock(&drbd_pp_lock);
341 page_chain_add(&drbd_pp_pool, page, tmp);
342 drbd_pp_vacant += i;
343 spin_unlock(&drbd_pp_lock);
345 i = atomic_sub_return(i, a);
346 if (i < 0)
347 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
348 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
349 wake_up(&drbd_pp_wait);
353 You need to hold the req_lock:
354 _drbd_wait_ee_list_empty()
356 You must not have the req_lock:
357 drbd_free_peer_req()
358 drbd_alloc_peer_req()
359 drbd_free_peer_reqs()
360 drbd_ee_fix_bhs()
361 drbd_finish_peer_reqs()
362 drbd_clear_done_ee()
363 drbd_wait_ee_list_empty()
366 /* normal: payload_size == request size (bi_size)
367 * w_same: payload_size == logical_block_size
368 * trim: payload_size == 0 */
369 struct drbd_peer_request *
370 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
371 unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
373 struct drbd_device *device = peer_device->device;
374 struct drbd_peer_request *peer_req;
375 struct page *page = NULL;
376 unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT;
378 if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
379 return NULL;
381 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
382 if (!peer_req) {
383 if (!(gfp_mask & __GFP_NOWARN))
384 drbd_err(device, "%s: allocation failed\n", __func__);
385 return NULL;
388 if (nr_pages) {
389 page = drbd_alloc_pages(peer_device, nr_pages,
390 gfpflags_allow_blocking(gfp_mask));
391 if (!page)
392 goto fail;
395 memset(peer_req, 0, sizeof(*peer_req));
396 INIT_LIST_HEAD(&peer_req->w.list);
397 drbd_clear_interval(&peer_req->i);
398 peer_req->i.size = request_size;
399 peer_req->i.sector = sector;
400 peer_req->submit_jif = jiffies;
401 peer_req->peer_device = peer_device;
402 peer_req->pages = page;
404 * The block_id is opaque to the receiver. It is not endianness
405 * converted, and sent back to the sender unchanged.
407 peer_req->block_id = id;
409 return peer_req;
411 fail:
412 mempool_free(peer_req, drbd_ee_mempool);
413 return NULL;
416 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
417 int is_net)
419 might_sleep();
420 if (peer_req->flags & EE_HAS_DIGEST)
421 kfree(peer_req->digest);
422 drbd_free_pages(device, peer_req->pages, is_net);
423 D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
424 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
425 if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
426 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
427 drbd_al_complete_io(device, &peer_req->i);
429 mempool_free(peer_req, drbd_ee_mempool);
432 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
434 LIST_HEAD(work_list);
435 struct drbd_peer_request *peer_req, *t;
436 int count = 0;
437 int is_net = list == &device->net_ee;
439 spin_lock_irq(&device->resource->req_lock);
440 list_splice_init(list, &work_list);
441 spin_unlock_irq(&device->resource->req_lock);
443 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
444 __drbd_free_peer_req(device, peer_req, is_net);
445 count++;
447 return count;
451 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
453 static int drbd_finish_peer_reqs(struct drbd_device *device)
455 LIST_HEAD(work_list);
456 LIST_HEAD(reclaimed);
457 struct drbd_peer_request *peer_req, *t;
458 int err = 0;
460 spin_lock_irq(&device->resource->req_lock);
461 reclaim_finished_net_peer_reqs(device, &reclaimed);
462 list_splice_init(&device->done_ee, &work_list);
463 spin_unlock_irq(&device->resource->req_lock);
465 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
466 drbd_free_net_peer_req(device, peer_req);
468 /* possible callbacks here:
469 * e_end_block, and e_end_resync_block, e_send_superseded.
470 * all ignore the last argument.
472 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
473 int err2;
475 /* list_del not necessary, next/prev members not touched */
476 err2 = peer_req->w.cb(&peer_req->w, !!err);
477 if (!err)
478 err = err2;
479 drbd_free_peer_req(device, peer_req);
481 wake_up(&device->ee_wait);
483 return err;
486 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
487 struct list_head *head)
489 DEFINE_WAIT(wait);
491 /* avoids spin_lock/unlock
492 * and calling prepare_to_wait in the fast path */
493 while (!list_empty(head)) {
494 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
495 spin_unlock_irq(&device->resource->req_lock);
496 io_schedule();
497 finish_wait(&device->ee_wait, &wait);
498 spin_lock_irq(&device->resource->req_lock);
502 static void drbd_wait_ee_list_empty(struct drbd_device *device,
503 struct list_head *head)
505 spin_lock_irq(&device->resource->req_lock);
506 _drbd_wait_ee_list_empty(device, head);
507 spin_unlock_irq(&device->resource->req_lock);
510 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
512 struct kvec iov = {
513 .iov_base = buf,
514 .iov_len = size,
516 struct msghdr msg = {
517 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
519 return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
522 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
524 int rv;
526 rv = drbd_recv_short(connection->data.socket, buf, size, 0);
528 if (rv < 0) {
529 if (rv == -ECONNRESET)
530 drbd_info(connection, "sock was reset by peer\n");
531 else if (rv != -ERESTARTSYS)
532 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
533 } else if (rv == 0) {
534 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
535 long t;
536 rcu_read_lock();
537 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
538 rcu_read_unlock();
540 t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
542 if (t)
543 goto out;
545 drbd_info(connection, "sock was shut down by peer\n");
548 if (rv != size)
549 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
551 out:
552 return rv;
555 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
557 int err;
559 err = drbd_recv(connection, buf, size);
560 if (err != size) {
561 if (err >= 0)
562 err = -EIO;
563 } else
564 err = 0;
565 return err;
568 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
570 int err;
572 err = drbd_recv_all(connection, buf, size);
573 if (err && !signal_pending(current))
574 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
575 return err;
578 /* quoting tcp(7):
579 * On individual connections, the socket buffer size must be set prior to the
580 * listen(2) or connect(2) calls in order to have it take effect.
581 * This is our wrapper to do so.
583 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
584 unsigned int rcv)
586 /* open coded SO_SNDBUF, SO_RCVBUF */
587 if (snd) {
588 sock->sk->sk_sndbuf = snd;
589 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
591 if (rcv) {
592 sock->sk->sk_rcvbuf = rcv;
593 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
597 static struct socket *drbd_try_connect(struct drbd_connection *connection)
599 const char *what;
600 struct socket *sock;
601 struct sockaddr_in6 src_in6;
602 struct sockaddr_in6 peer_in6;
603 struct net_conf *nc;
604 int err, peer_addr_len, my_addr_len;
605 int sndbuf_size, rcvbuf_size, connect_int;
606 int disconnect_on_error = 1;
608 rcu_read_lock();
609 nc = rcu_dereference(connection->net_conf);
610 if (!nc) {
611 rcu_read_unlock();
612 return NULL;
614 sndbuf_size = nc->sndbuf_size;
615 rcvbuf_size = nc->rcvbuf_size;
616 connect_int = nc->connect_int;
617 rcu_read_unlock();
619 my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
620 memcpy(&src_in6, &connection->my_addr, my_addr_len);
622 if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
623 src_in6.sin6_port = 0;
624 else
625 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
627 peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
628 memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
630 what = "sock_create_kern";
631 err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
632 SOCK_STREAM, IPPROTO_TCP, &sock);
633 if (err < 0) {
634 sock = NULL;
635 goto out;
638 sock->sk->sk_rcvtimeo =
639 sock->sk->sk_sndtimeo = connect_int * HZ;
640 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
642 /* explicitly bind to the configured IP as source IP
643 * for the outgoing connections.
644 * This is needed for multihomed hosts and to be
645 * able to use lo: interfaces for drbd.
646 * Make sure to use 0 as port number, so linux selects
647 * a free one dynamically.
649 what = "bind before connect";
650 err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
651 if (err < 0)
652 goto out;
654 /* connect may fail, peer not yet available.
655 * stay C_WF_CONNECTION, don't go Disconnecting! */
656 disconnect_on_error = 0;
657 what = "connect";
658 err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
660 out:
661 if (err < 0) {
662 if (sock) {
663 sock_release(sock);
664 sock = NULL;
666 switch (-err) {
667 /* timeout, busy, signal pending */
668 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
669 case EINTR: case ERESTARTSYS:
670 /* peer not (yet) available, network problem */
671 case ECONNREFUSED: case ENETUNREACH:
672 case EHOSTDOWN: case EHOSTUNREACH:
673 disconnect_on_error = 0;
674 break;
675 default:
676 drbd_err(connection, "%s failed, err = %d\n", what, err);
678 if (disconnect_on_error)
679 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
682 return sock;
685 struct accept_wait_data {
686 struct drbd_connection *connection;
687 struct socket *s_listen;
688 struct completion door_bell;
689 void (*original_sk_state_change)(struct sock *sk);
693 static void drbd_incoming_connection(struct sock *sk)
695 struct accept_wait_data *ad = sk->sk_user_data;
696 void (*state_change)(struct sock *sk);
698 state_change = ad->original_sk_state_change;
699 if (sk->sk_state == TCP_ESTABLISHED)
700 complete(&ad->door_bell);
701 state_change(sk);
704 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
706 int err, sndbuf_size, rcvbuf_size, my_addr_len;
707 struct sockaddr_in6 my_addr;
708 struct socket *s_listen;
709 struct net_conf *nc;
710 const char *what;
712 rcu_read_lock();
713 nc = rcu_dereference(connection->net_conf);
714 if (!nc) {
715 rcu_read_unlock();
716 return -EIO;
718 sndbuf_size = nc->sndbuf_size;
719 rcvbuf_size = nc->rcvbuf_size;
720 rcu_read_unlock();
722 my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
723 memcpy(&my_addr, &connection->my_addr, my_addr_len);
725 what = "sock_create_kern";
726 err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
727 SOCK_STREAM, IPPROTO_TCP, &s_listen);
728 if (err) {
729 s_listen = NULL;
730 goto out;
733 s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
734 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
736 what = "bind before listen";
737 err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
738 if (err < 0)
739 goto out;
741 ad->s_listen = s_listen;
742 write_lock_bh(&s_listen->sk->sk_callback_lock);
743 ad->original_sk_state_change = s_listen->sk->sk_state_change;
744 s_listen->sk->sk_state_change = drbd_incoming_connection;
745 s_listen->sk->sk_user_data = ad;
746 write_unlock_bh(&s_listen->sk->sk_callback_lock);
748 what = "listen";
749 err = s_listen->ops->listen(s_listen, 5);
750 if (err < 0)
751 goto out;
753 return 0;
754 out:
755 if (s_listen)
756 sock_release(s_listen);
757 if (err < 0) {
758 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
759 drbd_err(connection, "%s failed, err = %d\n", what, err);
760 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
764 return -EIO;
767 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
769 write_lock_bh(&sk->sk_callback_lock);
770 sk->sk_state_change = ad->original_sk_state_change;
771 sk->sk_user_data = NULL;
772 write_unlock_bh(&sk->sk_callback_lock);
775 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
777 int timeo, connect_int, err = 0;
778 struct socket *s_estab = NULL;
779 struct net_conf *nc;
781 rcu_read_lock();
782 nc = rcu_dereference(connection->net_conf);
783 if (!nc) {
784 rcu_read_unlock();
785 return NULL;
787 connect_int = nc->connect_int;
788 rcu_read_unlock();
790 timeo = connect_int * HZ;
791 /* 28.5% random jitter */
792 timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
794 err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
795 if (err <= 0)
796 return NULL;
798 err = kernel_accept(ad->s_listen, &s_estab, 0);
799 if (err < 0) {
800 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
801 drbd_err(connection, "accept failed, err = %d\n", err);
802 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
806 if (s_estab)
807 unregister_state_change(s_estab->sk, ad);
809 return s_estab;
812 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
814 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
815 enum drbd_packet cmd)
817 if (!conn_prepare_command(connection, sock))
818 return -EIO;
819 return conn_send_command(connection, sock, cmd, 0, NULL, 0);
822 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
824 unsigned int header_size = drbd_header_size(connection);
825 struct packet_info pi;
826 struct net_conf *nc;
827 int err;
829 rcu_read_lock();
830 nc = rcu_dereference(connection->net_conf);
831 if (!nc) {
832 rcu_read_unlock();
833 return -EIO;
835 sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
836 rcu_read_unlock();
838 err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
839 if (err != header_size) {
840 if (err >= 0)
841 err = -EIO;
842 return err;
844 err = decode_header(connection, connection->data.rbuf, &pi);
845 if (err)
846 return err;
847 return pi.cmd;
851 * drbd_socket_okay() - Free the socket if its connection is not okay
852 * @sock: pointer to the pointer to the socket.
854 static bool drbd_socket_okay(struct socket **sock)
856 int rr;
857 char tb[4];
859 if (!*sock)
860 return false;
862 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
864 if (rr > 0 || rr == -EAGAIN) {
865 return true;
866 } else {
867 sock_release(*sock);
868 *sock = NULL;
869 return false;
873 static bool connection_established(struct drbd_connection *connection,
874 struct socket **sock1,
875 struct socket **sock2)
877 struct net_conf *nc;
878 int timeout;
879 bool ok;
881 if (!*sock1 || !*sock2)
882 return false;
884 rcu_read_lock();
885 nc = rcu_dereference(connection->net_conf);
886 timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
887 rcu_read_unlock();
888 schedule_timeout_interruptible(timeout);
890 ok = drbd_socket_okay(sock1);
891 ok = drbd_socket_okay(sock2) && ok;
893 return ok;
896 /* Gets called if a connection is established, or if a new minor gets created
897 in a connection */
898 int drbd_connected(struct drbd_peer_device *peer_device)
900 struct drbd_device *device = peer_device->device;
901 int err;
903 atomic_set(&device->packet_seq, 0);
904 device->peer_seq = 0;
906 device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
907 &peer_device->connection->cstate_mutex :
908 &device->own_state_mutex;
910 err = drbd_send_sync_param(peer_device);
911 if (!err)
912 err = drbd_send_sizes(peer_device, 0, 0);
913 if (!err)
914 err = drbd_send_uuids(peer_device);
915 if (!err)
916 err = drbd_send_current_state(peer_device);
917 clear_bit(USE_DEGR_WFC_T, &device->flags);
918 clear_bit(RESIZE_PENDING, &device->flags);
919 atomic_set(&device->ap_in_flight, 0);
920 mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
921 return err;
925 * return values:
926 * 1 yes, we have a valid connection
927 * 0 oops, did not work out, please try again
928 * -1 peer talks different language,
929 * no point in trying again, please go standalone.
930 * -2 We do not have a network config...
932 static int conn_connect(struct drbd_connection *connection)
934 struct drbd_socket sock, msock;
935 struct drbd_peer_device *peer_device;
936 struct net_conf *nc;
937 int vnr, timeout, h;
938 bool discard_my_data, ok;
939 enum drbd_state_rv rv;
940 struct accept_wait_data ad = {
941 .connection = connection,
942 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
945 clear_bit(DISCONNECT_SENT, &connection->flags);
946 if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
947 return -2;
949 mutex_init(&sock.mutex);
950 sock.sbuf = connection->data.sbuf;
951 sock.rbuf = connection->data.rbuf;
952 sock.socket = NULL;
953 mutex_init(&msock.mutex);
954 msock.sbuf = connection->meta.sbuf;
955 msock.rbuf = connection->meta.rbuf;
956 msock.socket = NULL;
958 /* Assume that the peer only understands protocol 80 until we know better. */
959 connection->agreed_pro_version = 80;
961 if (prepare_listen_socket(connection, &ad))
962 return 0;
964 do {
965 struct socket *s;
967 s = drbd_try_connect(connection);
968 if (s) {
969 if (!sock.socket) {
970 sock.socket = s;
971 send_first_packet(connection, &sock, P_INITIAL_DATA);
972 } else if (!msock.socket) {
973 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
974 msock.socket = s;
975 send_first_packet(connection, &msock, P_INITIAL_META);
976 } else {
977 drbd_err(connection, "Logic error in conn_connect()\n");
978 goto out_release_sockets;
982 if (connection_established(connection, &sock.socket, &msock.socket))
983 break;
985 retry:
986 s = drbd_wait_for_connect(connection, &ad);
987 if (s) {
988 int fp = receive_first_packet(connection, s);
989 drbd_socket_okay(&sock.socket);
990 drbd_socket_okay(&msock.socket);
991 switch (fp) {
992 case P_INITIAL_DATA:
993 if (sock.socket) {
994 drbd_warn(connection, "initial packet S crossed\n");
995 sock_release(sock.socket);
996 sock.socket = s;
997 goto randomize;
999 sock.socket = s;
1000 break;
1001 case P_INITIAL_META:
1002 set_bit(RESOLVE_CONFLICTS, &connection->flags);
1003 if (msock.socket) {
1004 drbd_warn(connection, "initial packet M crossed\n");
1005 sock_release(msock.socket);
1006 msock.socket = s;
1007 goto randomize;
1009 msock.socket = s;
1010 break;
1011 default:
1012 drbd_warn(connection, "Error receiving initial packet\n");
1013 sock_release(s);
1014 randomize:
1015 if (prandom_u32() & 1)
1016 goto retry;
1020 if (connection->cstate <= C_DISCONNECTING)
1021 goto out_release_sockets;
1022 if (signal_pending(current)) {
1023 flush_signals(current);
1024 smp_rmb();
1025 if (get_t_state(&connection->receiver) == EXITING)
1026 goto out_release_sockets;
1029 ok = connection_established(connection, &sock.socket, &msock.socket);
1030 } while (!ok);
1032 if (ad.s_listen)
1033 sock_release(ad.s_listen);
1035 sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1036 msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1038 sock.socket->sk->sk_allocation = GFP_NOIO;
1039 msock.socket->sk->sk_allocation = GFP_NOIO;
1041 sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1042 msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1044 /* NOT YET ...
1045 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1046 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1047 * first set it to the P_CONNECTION_FEATURES timeout,
1048 * which we set to 4x the configured ping_timeout. */
1049 rcu_read_lock();
1050 nc = rcu_dereference(connection->net_conf);
1052 sock.socket->sk->sk_sndtimeo =
1053 sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1055 msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1056 timeout = nc->timeout * HZ / 10;
1057 discard_my_data = nc->discard_my_data;
1058 rcu_read_unlock();
1060 msock.socket->sk->sk_sndtimeo = timeout;
1062 /* we don't want delays.
1063 * we use TCP_CORK where appropriate, though */
1064 drbd_tcp_nodelay(sock.socket);
1065 drbd_tcp_nodelay(msock.socket);
1067 connection->data.socket = sock.socket;
1068 connection->meta.socket = msock.socket;
1069 connection->last_received = jiffies;
1071 h = drbd_do_features(connection);
1072 if (h <= 0)
1073 return h;
1075 if (connection->cram_hmac_tfm) {
1076 /* drbd_request_state(device, NS(conn, WFAuth)); */
1077 switch (drbd_do_auth(connection)) {
1078 case -1:
1079 drbd_err(connection, "Authentication of peer failed\n");
1080 return -1;
1081 case 0:
1082 drbd_err(connection, "Authentication of peer failed, trying again.\n");
1083 return 0;
1087 connection->data.socket->sk->sk_sndtimeo = timeout;
1088 connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1090 if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1091 return -1;
1093 /* Prevent a race between resync-handshake and
1094 * being promoted to Primary.
1096 * Grab and release the state mutex, so we know that any current
1097 * drbd_set_role() is finished, and any incoming drbd_set_role
1098 * will see the STATE_SENT flag, and wait for it to be cleared.
1100 idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1101 mutex_lock(peer_device->device->state_mutex);
1103 set_bit(STATE_SENT, &connection->flags);
1105 idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1106 mutex_unlock(peer_device->device->state_mutex);
1108 rcu_read_lock();
1109 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1110 struct drbd_device *device = peer_device->device;
1111 kref_get(&device->kref);
1112 rcu_read_unlock();
1114 if (discard_my_data)
1115 set_bit(DISCARD_MY_DATA, &device->flags);
1116 else
1117 clear_bit(DISCARD_MY_DATA, &device->flags);
1119 drbd_connected(peer_device);
1120 kref_put(&device->kref, drbd_destroy_device);
1121 rcu_read_lock();
1123 rcu_read_unlock();
1125 rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1126 if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1127 clear_bit(STATE_SENT, &connection->flags);
1128 return 0;
1131 drbd_thread_start(&connection->ack_receiver);
1132 /* opencoded create_singlethread_workqueue(),
1133 * to be able to use format string arguments */
1134 connection->ack_sender =
1135 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1136 if (!connection->ack_sender) {
1137 drbd_err(connection, "Failed to create workqueue ack_sender\n");
1138 return 0;
1141 mutex_lock(&connection->resource->conf_update);
1142 /* The discard_my_data flag is a single-shot modifier to the next
1143 * connection attempt, the handshake of which is now well underway.
1144 * No need for rcu style copying of the whole struct
1145 * just to clear a single value. */
1146 connection->net_conf->discard_my_data = 0;
1147 mutex_unlock(&connection->resource->conf_update);
1149 return h;
1151 out_release_sockets:
1152 if (ad.s_listen)
1153 sock_release(ad.s_listen);
1154 if (sock.socket)
1155 sock_release(sock.socket);
1156 if (msock.socket)
1157 sock_release(msock.socket);
1158 return -1;
1161 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1163 unsigned int header_size = drbd_header_size(connection);
1165 if (header_size == sizeof(struct p_header100) &&
1166 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1167 struct p_header100 *h = header;
1168 if (h->pad != 0) {
1169 drbd_err(connection, "Header padding is not zero\n");
1170 return -EINVAL;
1172 pi->vnr = be16_to_cpu(h->volume);
1173 pi->cmd = be16_to_cpu(h->command);
1174 pi->size = be32_to_cpu(h->length);
1175 } else if (header_size == sizeof(struct p_header95) &&
1176 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1177 struct p_header95 *h = header;
1178 pi->cmd = be16_to_cpu(h->command);
1179 pi->size = be32_to_cpu(h->length);
1180 pi->vnr = 0;
1181 } else if (header_size == sizeof(struct p_header80) &&
1182 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1183 struct p_header80 *h = header;
1184 pi->cmd = be16_to_cpu(h->command);
1185 pi->size = be16_to_cpu(h->length);
1186 pi->vnr = 0;
1187 } else {
1188 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1189 be32_to_cpu(*(__be32 *)header),
1190 connection->agreed_pro_version);
1191 return -EINVAL;
1193 pi->data = header + header_size;
1194 return 0;
1197 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1199 void *buffer = connection->data.rbuf;
1200 int err;
1202 err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1203 if (err)
1204 return err;
1206 err = decode_header(connection, buffer, pi);
1207 connection->last_received = jiffies;
1209 return err;
1212 /* This is blkdev_issue_flush, but asynchronous.
1213 * We want to submit to all component volumes in parallel,
1214 * then wait for all completions.
1216 struct issue_flush_context {
1217 atomic_t pending;
1218 int error;
1219 struct completion done;
1221 struct one_flush_context {
1222 struct drbd_device *device;
1223 struct issue_flush_context *ctx;
1226 void one_flush_endio(struct bio *bio)
1228 struct one_flush_context *octx = bio->bi_private;
1229 struct drbd_device *device = octx->device;
1230 struct issue_flush_context *ctx = octx->ctx;
1232 if (bio->bi_status) {
1233 ctx->error = blk_status_to_errno(bio->bi_status);
1234 drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
1236 kfree(octx);
1237 bio_put(bio);
1239 clear_bit(FLUSH_PENDING, &device->flags);
1240 put_ldev(device);
1241 kref_put(&device->kref, drbd_destroy_device);
1243 if (atomic_dec_and_test(&ctx->pending))
1244 complete(&ctx->done);
1247 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1249 struct bio *bio = bio_alloc(GFP_NOIO, 0);
1250 struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1251 if (!bio || !octx) {
1252 drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1253 /* FIXME: what else can I do now? disconnecting or detaching
1254 * really does not help to improve the state of the world, either.
1256 kfree(octx);
1257 if (bio)
1258 bio_put(bio);
1260 ctx->error = -ENOMEM;
1261 put_ldev(device);
1262 kref_put(&device->kref, drbd_destroy_device);
1263 return;
1266 octx->device = device;
1267 octx->ctx = ctx;
1268 bio->bi_bdev = device->ldev->backing_bdev;
1269 bio->bi_private = octx;
1270 bio->bi_end_io = one_flush_endio;
1271 bio->bi_opf = REQ_OP_FLUSH | REQ_PREFLUSH;
1273 device->flush_jif = jiffies;
1274 set_bit(FLUSH_PENDING, &device->flags);
1275 atomic_inc(&ctx->pending);
1276 submit_bio(bio);
1279 static void drbd_flush(struct drbd_connection *connection)
1281 if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1282 struct drbd_peer_device *peer_device;
1283 struct issue_flush_context ctx;
1284 int vnr;
1286 atomic_set(&ctx.pending, 1);
1287 ctx.error = 0;
1288 init_completion(&ctx.done);
1290 rcu_read_lock();
1291 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1292 struct drbd_device *device = peer_device->device;
1294 if (!get_ldev(device))
1295 continue;
1296 kref_get(&device->kref);
1297 rcu_read_unlock();
1299 submit_one_flush(device, &ctx);
1301 rcu_read_lock();
1303 rcu_read_unlock();
1305 /* Do we want to add a timeout,
1306 * if disk-timeout is set? */
1307 if (!atomic_dec_and_test(&ctx.pending))
1308 wait_for_completion(&ctx.done);
1310 if (ctx.error) {
1311 /* would rather check on EOPNOTSUPP, but that is not reliable.
1312 * don't try again for ANY return value != 0
1313 * if (rv == -EOPNOTSUPP) */
1314 /* Any error is already reported by bio_endio callback. */
1315 drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1321 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1322 * @device: DRBD device.
1323 * @epoch: Epoch object.
1324 * @ev: Epoch event.
1326 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1327 struct drbd_epoch *epoch,
1328 enum epoch_event ev)
1330 int epoch_size;
1331 struct drbd_epoch *next_epoch;
1332 enum finish_epoch rv = FE_STILL_LIVE;
1334 spin_lock(&connection->epoch_lock);
1335 do {
1336 next_epoch = NULL;
1338 epoch_size = atomic_read(&epoch->epoch_size);
1340 switch (ev & ~EV_CLEANUP) {
1341 case EV_PUT:
1342 atomic_dec(&epoch->active);
1343 break;
1344 case EV_GOT_BARRIER_NR:
1345 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1346 break;
1347 case EV_BECAME_LAST:
1348 /* nothing to do*/
1349 break;
1352 if (epoch_size != 0 &&
1353 atomic_read(&epoch->active) == 0 &&
1354 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1355 if (!(ev & EV_CLEANUP)) {
1356 spin_unlock(&connection->epoch_lock);
1357 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1358 spin_lock(&connection->epoch_lock);
1360 #if 0
1361 /* FIXME: dec unacked on connection, once we have
1362 * something to count pending connection packets in. */
1363 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1364 dec_unacked(epoch->connection);
1365 #endif
1367 if (connection->current_epoch != epoch) {
1368 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1369 list_del(&epoch->list);
1370 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1371 connection->epochs--;
1372 kfree(epoch);
1374 if (rv == FE_STILL_LIVE)
1375 rv = FE_DESTROYED;
1376 } else {
1377 epoch->flags = 0;
1378 atomic_set(&epoch->epoch_size, 0);
1379 /* atomic_set(&epoch->active, 0); is already zero */
1380 if (rv == FE_STILL_LIVE)
1381 rv = FE_RECYCLED;
1385 if (!next_epoch)
1386 break;
1388 epoch = next_epoch;
1389 } while (1);
1391 spin_unlock(&connection->epoch_lock);
1393 return rv;
1396 static enum write_ordering_e
1397 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1399 struct disk_conf *dc;
1401 dc = rcu_dereference(bdev->disk_conf);
1403 if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1404 wo = WO_DRAIN_IO;
1405 if (wo == WO_DRAIN_IO && !dc->disk_drain)
1406 wo = WO_NONE;
1408 return wo;
1412 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1413 * @connection: DRBD connection.
1414 * @wo: Write ordering method to try.
1416 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1417 enum write_ordering_e wo)
1419 struct drbd_device *device;
1420 enum write_ordering_e pwo;
1421 int vnr;
1422 static char *write_ordering_str[] = {
1423 [WO_NONE] = "none",
1424 [WO_DRAIN_IO] = "drain",
1425 [WO_BDEV_FLUSH] = "flush",
1428 pwo = resource->write_ordering;
1429 if (wo != WO_BDEV_FLUSH)
1430 wo = min(pwo, wo);
1431 rcu_read_lock();
1432 idr_for_each_entry(&resource->devices, device, vnr) {
1433 if (get_ldev(device)) {
1434 wo = max_allowed_wo(device->ldev, wo);
1435 if (device->ldev == bdev)
1436 bdev = NULL;
1437 put_ldev(device);
1441 if (bdev)
1442 wo = max_allowed_wo(bdev, wo);
1444 rcu_read_unlock();
1446 resource->write_ordering = wo;
1447 if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1448 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1451 static void drbd_issue_peer_discard(struct drbd_device *device, struct drbd_peer_request *peer_req)
1453 struct block_device *bdev = device->ldev->backing_bdev;
1455 if (blkdev_issue_zeroout(bdev, peer_req->i.sector, peer_req->i.size >> 9,
1456 GFP_NOIO, 0))
1457 peer_req->flags |= EE_WAS_ERROR;
1459 drbd_endio_write_sec_final(peer_req);
1462 static void drbd_issue_peer_wsame(struct drbd_device *device,
1463 struct drbd_peer_request *peer_req)
1465 struct block_device *bdev = device->ldev->backing_bdev;
1466 sector_t s = peer_req->i.sector;
1467 sector_t nr = peer_req->i.size >> 9;
1468 if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages))
1469 peer_req->flags |= EE_WAS_ERROR;
1470 drbd_endio_write_sec_final(peer_req);
1475 * drbd_submit_peer_request()
1476 * @device: DRBD device.
1477 * @peer_req: peer request
1478 * @rw: flag field, see bio->bi_opf
1480 * May spread the pages to multiple bios,
1481 * depending on bio_add_page restrictions.
1483 * Returns 0 if all bios have been submitted,
1484 * -ENOMEM if we could not allocate enough bios,
1485 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1486 * single page to an empty bio (which should never happen and likely indicates
1487 * that the lower level IO stack is in some way broken). This has been observed
1488 * on certain Xen deployments.
1490 /* TODO allocate from our own bio_set. */
1491 int drbd_submit_peer_request(struct drbd_device *device,
1492 struct drbd_peer_request *peer_req,
1493 const unsigned op, const unsigned op_flags,
1494 const int fault_type)
1496 struct bio *bios = NULL;
1497 struct bio *bio;
1498 struct page *page = peer_req->pages;
1499 sector_t sector = peer_req->i.sector;
1500 unsigned data_size = peer_req->i.size;
1501 unsigned n_bios = 0;
1502 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1503 int err = -ENOMEM;
1505 /* TRIM/DISCARD: for now, always use the helper function
1506 * blkdev_issue_zeroout(..., discard=true).
1507 * It's synchronous, but it does the right thing wrt. bio splitting.
1508 * Correctness first, performance later. Next step is to code an
1509 * asynchronous variant of the same.
1511 if (peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) {
1512 /* wait for all pending IO completions, before we start
1513 * zeroing things out. */
1514 conn_wait_active_ee_empty(peer_req->peer_device->connection);
1515 /* add it to the active list now,
1516 * so we can find it to present it in debugfs */
1517 peer_req->submit_jif = jiffies;
1518 peer_req->flags |= EE_SUBMITTED;
1520 /* If this was a resync request from receive_rs_deallocated(),
1521 * it is already on the sync_ee list */
1522 if (list_empty(&peer_req->w.list)) {
1523 spin_lock_irq(&device->resource->req_lock);
1524 list_add_tail(&peer_req->w.list, &device->active_ee);
1525 spin_unlock_irq(&device->resource->req_lock);
1528 if (peer_req->flags & EE_IS_TRIM)
1529 drbd_issue_peer_discard(device, peer_req);
1530 else /* EE_WRITE_SAME */
1531 drbd_issue_peer_wsame(device, peer_req);
1532 return 0;
1535 /* In most cases, we will only need one bio. But in case the lower
1536 * level restrictions happen to be different at this offset on this
1537 * side than those of the sending peer, we may need to submit the
1538 * request in more than one bio.
1540 * Plain bio_alloc is good enough here, this is no DRBD internally
1541 * generated bio, but a bio allocated on behalf of the peer.
1543 next_bio:
1544 bio = bio_alloc(GFP_NOIO, nr_pages);
1545 if (!bio) {
1546 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1547 goto fail;
1549 /* > peer_req->i.sector, unless this is the first bio */
1550 bio->bi_iter.bi_sector = sector;
1551 bio->bi_bdev = device->ldev->backing_bdev;
1552 bio_set_op_attrs(bio, op, op_flags);
1553 bio->bi_private = peer_req;
1554 bio->bi_end_io = drbd_peer_request_endio;
1556 bio->bi_next = bios;
1557 bios = bio;
1558 ++n_bios;
1560 page_chain_for_each(page) {
1561 unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1562 if (!bio_add_page(bio, page, len, 0))
1563 goto next_bio;
1564 data_size -= len;
1565 sector += len >> 9;
1566 --nr_pages;
1568 D_ASSERT(device, data_size == 0);
1569 D_ASSERT(device, page == NULL);
1571 atomic_set(&peer_req->pending_bios, n_bios);
1572 /* for debugfs: update timestamp, mark as submitted */
1573 peer_req->submit_jif = jiffies;
1574 peer_req->flags |= EE_SUBMITTED;
1575 do {
1576 bio = bios;
1577 bios = bios->bi_next;
1578 bio->bi_next = NULL;
1580 drbd_generic_make_request(device, fault_type, bio);
1581 } while (bios);
1582 return 0;
1584 fail:
1585 while (bios) {
1586 bio = bios;
1587 bios = bios->bi_next;
1588 bio_put(bio);
1590 return err;
1593 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1594 struct drbd_peer_request *peer_req)
1596 struct drbd_interval *i = &peer_req->i;
1598 drbd_remove_interval(&device->write_requests, i);
1599 drbd_clear_interval(i);
1601 /* Wake up any processes waiting for this peer request to complete. */
1602 if (i->waiting)
1603 wake_up(&device->misc_wait);
1606 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1608 struct drbd_peer_device *peer_device;
1609 int vnr;
1611 rcu_read_lock();
1612 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1613 struct drbd_device *device = peer_device->device;
1615 kref_get(&device->kref);
1616 rcu_read_unlock();
1617 drbd_wait_ee_list_empty(device, &device->active_ee);
1618 kref_put(&device->kref, drbd_destroy_device);
1619 rcu_read_lock();
1621 rcu_read_unlock();
1624 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1626 int rv;
1627 struct p_barrier *p = pi->data;
1628 struct drbd_epoch *epoch;
1630 /* FIXME these are unacked on connection,
1631 * not a specific (peer)device.
1633 connection->current_epoch->barrier_nr = p->barrier;
1634 connection->current_epoch->connection = connection;
1635 rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1637 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1638 * the activity log, which means it would not be resynced in case the
1639 * R_PRIMARY crashes now.
1640 * Therefore we must send the barrier_ack after the barrier request was
1641 * completed. */
1642 switch (connection->resource->write_ordering) {
1643 case WO_NONE:
1644 if (rv == FE_RECYCLED)
1645 return 0;
1647 /* receiver context, in the writeout path of the other node.
1648 * avoid potential distributed deadlock */
1649 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1650 if (epoch)
1651 break;
1652 else
1653 drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1654 /* Fall through */
1656 case WO_BDEV_FLUSH:
1657 case WO_DRAIN_IO:
1658 conn_wait_active_ee_empty(connection);
1659 drbd_flush(connection);
1661 if (atomic_read(&connection->current_epoch->epoch_size)) {
1662 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1663 if (epoch)
1664 break;
1667 return 0;
1668 default:
1669 drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1670 connection->resource->write_ordering);
1671 return -EIO;
1674 epoch->flags = 0;
1675 atomic_set(&epoch->epoch_size, 0);
1676 atomic_set(&epoch->active, 0);
1678 spin_lock(&connection->epoch_lock);
1679 if (atomic_read(&connection->current_epoch->epoch_size)) {
1680 list_add(&epoch->list, &connection->current_epoch->list);
1681 connection->current_epoch = epoch;
1682 connection->epochs++;
1683 } else {
1684 /* The current_epoch got recycled while we allocated this one... */
1685 kfree(epoch);
1687 spin_unlock(&connection->epoch_lock);
1689 return 0;
1692 /* quick wrapper in case payload size != request_size (write same) */
1693 static void drbd_csum_ee_size(struct crypto_ahash *h,
1694 struct drbd_peer_request *r, void *d,
1695 unsigned int payload_size)
1697 unsigned int tmp = r->i.size;
1698 r->i.size = payload_size;
1699 drbd_csum_ee(h, r, d);
1700 r->i.size = tmp;
1703 /* used from receive_RSDataReply (recv_resync_read)
1704 * and from receive_Data.
1705 * data_size: actual payload ("data in")
1706 * for normal writes that is bi_size.
1707 * for discards, that is zero.
1708 * for write same, it is logical_block_size.
1709 * both trim and write same have the bi_size ("data len to be affected")
1710 * as extra argument in the packet header.
1712 static struct drbd_peer_request *
1713 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1714 struct packet_info *pi) __must_hold(local)
1716 struct drbd_device *device = peer_device->device;
1717 const sector_t capacity = drbd_get_capacity(device->this_bdev);
1718 struct drbd_peer_request *peer_req;
1719 struct page *page;
1720 int digest_size, err;
1721 unsigned int data_size = pi->size, ds;
1722 void *dig_in = peer_device->connection->int_dig_in;
1723 void *dig_vv = peer_device->connection->int_dig_vv;
1724 unsigned long *data;
1725 struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1726 struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL;
1728 digest_size = 0;
1729 if (!trim && peer_device->connection->peer_integrity_tfm) {
1730 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1732 * FIXME: Receive the incoming digest into the receive buffer
1733 * here, together with its struct p_data?
1735 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1736 if (err)
1737 return NULL;
1738 data_size -= digest_size;
1741 /* assume request_size == data_size, but special case trim and wsame. */
1742 ds = data_size;
1743 if (trim) {
1744 if (!expect(data_size == 0))
1745 return NULL;
1746 ds = be32_to_cpu(trim->size);
1747 } else if (wsame) {
1748 if (data_size != queue_logical_block_size(device->rq_queue)) {
1749 drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n",
1750 data_size, queue_logical_block_size(device->rq_queue));
1751 return NULL;
1753 if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) {
1754 drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n",
1755 data_size, bdev_logical_block_size(device->ldev->backing_bdev));
1756 return NULL;
1758 ds = be32_to_cpu(wsame->size);
1761 if (!expect(IS_ALIGNED(ds, 512)))
1762 return NULL;
1763 if (trim || wsame) {
1764 if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1765 return NULL;
1766 } else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1767 return NULL;
1769 /* even though we trust out peer,
1770 * we sometimes have to double check. */
1771 if (sector + (ds>>9) > capacity) {
1772 drbd_err(device, "request from peer beyond end of local disk: "
1773 "capacity: %llus < sector: %llus + size: %u\n",
1774 (unsigned long long)capacity,
1775 (unsigned long long)sector, ds);
1776 return NULL;
1779 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1780 * "criss-cross" setup, that might cause write-out on some other DRBD,
1781 * which in turn might block on the other node at this very place. */
1782 peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1783 if (!peer_req)
1784 return NULL;
1786 peer_req->flags |= EE_WRITE;
1787 if (trim) {
1788 peer_req->flags |= EE_IS_TRIM;
1789 return peer_req;
1791 if (wsame)
1792 peer_req->flags |= EE_WRITE_SAME;
1794 /* receive payload size bytes into page chain */
1795 ds = data_size;
1796 page = peer_req->pages;
1797 page_chain_for_each(page) {
1798 unsigned len = min_t(int, ds, PAGE_SIZE);
1799 data = kmap(page);
1800 err = drbd_recv_all_warn(peer_device->connection, data, len);
1801 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1802 drbd_err(device, "Fault injection: Corrupting data on receive\n");
1803 data[0] = data[0] ^ (unsigned long)-1;
1805 kunmap(page);
1806 if (err) {
1807 drbd_free_peer_req(device, peer_req);
1808 return NULL;
1810 ds -= len;
1813 if (digest_size) {
1814 drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1815 if (memcmp(dig_in, dig_vv, digest_size)) {
1816 drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1817 (unsigned long long)sector, data_size);
1818 drbd_free_peer_req(device, peer_req);
1819 return NULL;
1822 device->recv_cnt += data_size >> 9;
1823 return peer_req;
1826 /* drbd_drain_block() just takes a data block
1827 * out of the socket input buffer, and discards it.
1829 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1831 struct page *page;
1832 int err = 0;
1833 void *data;
1835 if (!data_size)
1836 return 0;
1838 page = drbd_alloc_pages(peer_device, 1, 1);
1840 data = kmap(page);
1841 while (data_size) {
1842 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1844 err = drbd_recv_all_warn(peer_device->connection, data, len);
1845 if (err)
1846 break;
1847 data_size -= len;
1849 kunmap(page);
1850 drbd_free_pages(peer_device->device, page, 0);
1851 return err;
1854 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1855 sector_t sector, int data_size)
1857 struct bio_vec bvec;
1858 struct bvec_iter iter;
1859 struct bio *bio;
1860 int digest_size, err, expect;
1861 void *dig_in = peer_device->connection->int_dig_in;
1862 void *dig_vv = peer_device->connection->int_dig_vv;
1864 digest_size = 0;
1865 if (peer_device->connection->peer_integrity_tfm) {
1866 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1867 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1868 if (err)
1869 return err;
1870 data_size -= digest_size;
1873 /* optimistically update recv_cnt. if receiving fails below,
1874 * we disconnect anyways, and counters will be reset. */
1875 peer_device->device->recv_cnt += data_size>>9;
1877 bio = req->master_bio;
1878 D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1880 bio_for_each_segment(bvec, bio, iter) {
1881 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1882 expect = min_t(int, data_size, bvec.bv_len);
1883 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1884 kunmap(bvec.bv_page);
1885 if (err)
1886 return err;
1887 data_size -= expect;
1890 if (digest_size) {
1891 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1892 if (memcmp(dig_in, dig_vv, digest_size)) {
1893 drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1894 return -EINVAL;
1898 D_ASSERT(peer_device->device, data_size == 0);
1899 return 0;
1903 * e_end_resync_block() is called in ack_sender context via
1904 * drbd_finish_peer_reqs().
1906 static int e_end_resync_block(struct drbd_work *w, int unused)
1908 struct drbd_peer_request *peer_req =
1909 container_of(w, struct drbd_peer_request, w);
1910 struct drbd_peer_device *peer_device = peer_req->peer_device;
1911 struct drbd_device *device = peer_device->device;
1912 sector_t sector = peer_req->i.sector;
1913 int err;
1915 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1917 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1918 drbd_set_in_sync(device, sector, peer_req->i.size);
1919 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1920 } else {
1921 /* Record failure to sync */
1922 drbd_rs_failed_io(device, sector, peer_req->i.size);
1924 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1926 dec_unacked(device);
1928 return err;
1931 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1932 struct packet_info *pi) __releases(local)
1934 struct drbd_device *device = peer_device->device;
1935 struct drbd_peer_request *peer_req;
1937 peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1938 if (!peer_req)
1939 goto fail;
1941 dec_rs_pending(device);
1943 inc_unacked(device);
1944 /* corresponding dec_unacked() in e_end_resync_block()
1945 * respective _drbd_clear_done_ee */
1947 peer_req->w.cb = e_end_resync_block;
1948 peer_req->submit_jif = jiffies;
1950 spin_lock_irq(&device->resource->req_lock);
1951 list_add_tail(&peer_req->w.list, &device->sync_ee);
1952 spin_unlock_irq(&device->resource->req_lock);
1954 atomic_add(pi->size >> 9, &device->rs_sect_ev);
1955 if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
1956 DRBD_FAULT_RS_WR) == 0)
1957 return 0;
1959 /* don't care for the reason here */
1960 drbd_err(device, "submit failed, triggering re-connect\n");
1961 spin_lock_irq(&device->resource->req_lock);
1962 list_del(&peer_req->w.list);
1963 spin_unlock_irq(&device->resource->req_lock);
1965 drbd_free_peer_req(device, peer_req);
1966 fail:
1967 put_ldev(device);
1968 return -EIO;
1971 static struct drbd_request *
1972 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1973 sector_t sector, bool missing_ok, const char *func)
1975 struct drbd_request *req;
1977 /* Request object according to our peer */
1978 req = (struct drbd_request *)(unsigned long)id;
1979 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1980 return req;
1981 if (!missing_ok) {
1982 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1983 (unsigned long)id, (unsigned long long)sector);
1985 return NULL;
1988 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1990 struct drbd_peer_device *peer_device;
1991 struct drbd_device *device;
1992 struct drbd_request *req;
1993 sector_t sector;
1994 int err;
1995 struct p_data *p = pi->data;
1997 peer_device = conn_peer_device(connection, pi->vnr);
1998 if (!peer_device)
1999 return -EIO;
2000 device = peer_device->device;
2002 sector = be64_to_cpu(p->sector);
2004 spin_lock_irq(&device->resource->req_lock);
2005 req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2006 spin_unlock_irq(&device->resource->req_lock);
2007 if (unlikely(!req))
2008 return -EIO;
2010 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2011 * special casing it there for the various failure cases.
2012 * still no race with drbd_fail_pending_reads */
2013 err = recv_dless_read(peer_device, req, sector, pi->size);
2014 if (!err)
2015 req_mod(req, DATA_RECEIVED);
2016 /* else: nothing. handled from drbd_disconnect...
2017 * I don't think we may complete this just yet
2018 * in case we are "on-disconnect: freeze" */
2020 return err;
2023 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2025 struct drbd_peer_device *peer_device;
2026 struct drbd_device *device;
2027 sector_t sector;
2028 int err;
2029 struct p_data *p = pi->data;
2031 peer_device = conn_peer_device(connection, pi->vnr);
2032 if (!peer_device)
2033 return -EIO;
2034 device = peer_device->device;
2036 sector = be64_to_cpu(p->sector);
2037 D_ASSERT(device, p->block_id == ID_SYNCER);
2039 if (get_ldev(device)) {
2040 /* data is submitted to disk within recv_resync_read.
2041 * corresponding put_ldev done below on error,
2042 * or in drbd_peer_request_endio. */
2043 err = recv_resync_read(peer_device, sector, pi);
2044 } else {
2045 if (__ratelimit(&drbd_ratelimit_state))
2046 drbd_err(device, "Can not write resync data to local disk.\n");
2048 err = drbd_drain_block(peer_device, pi->size);
2050 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2053 atomic_add(pi->size >> 9, &device->rs_sect_in);
2055 return err;
2058 static void restart_conflicting_writes(struct drbd_device *device,
2059 sector_t sector, int size)
2061 struct drbd_interval *i;
2062 struct drbd_request *req;
2064 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2065 if (!i->local)
2066 continue;
2067 req = container_of(i, struct drbd_request, i);
2068 if (req->rq_state & RQ_LOCAL_PENDING ||
2069 !(req->rq_state & RQ_POSTPONED))
2070 continue;
2071 /* as it is RQ_POSTPONED, this will cause it to
2072 * be queued on the retry workqueue. */
2073 __req_mod(req, CONFLICT_RESOLVED, NULL);
2078 * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2080 static int e_end_block(struct drbd_work *w, int cancel)
2082 struct drbd_peer_request *peer_req =
2083 container_of(w, struct drbd_peer_request, w);
2084 struct drbd_peer_device *peer_device = peer_req->peer_device;
2085 struct drbd_device *device = peer_device->device;
2086 sector_t sector = peer_req->i.sector;
2087 int err = 0, pcmd;
2089 if (peer_req->flags & EE_SEND_WRITE_ACK) {
2090 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2091 pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2092 device->state.conn <= C_PAUSED_SYNC_T &&
2093 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2094 P_RS_WRITE_ACK : P_WRITE_ACK;
2095 err = drbd_send_ack(peer_device, pcmd, peer_req);
2096 if (pcmd == P_RS_WRITE_ACK)
2097 drbd_set_in_sync(device, sector, peer_req->i.size);
2098 } else {
2099 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2100 /* we expect it to be marked out of sync anyways...
2101 * maybe assert this? */
2103 dec_unacked(device);
2106 /* we delete from the conflict detection hash _after_ we sent out the
2107 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
2108 if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2109 spin_lock_irq(&device->resource->req_lock);
2110 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2111 drbd_remove_epoch_entry_interval(device, peer_req);
2112 if (peer_req->flags & EE_RESTART_REQUESTS)
2113 restart_conflicting_writes(device, sector, peer_req->i.size);
2114 spin_unlock_irq(&device->resource->req_lock);
2115 } else
2116 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2118 drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2120 return err;
2123 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2125 struct drbd_peer_request *peer_req =
2126 container_of(w, struct drbd_peer_request, w);
2127 struct drbd_peer_device *peer_device = peer_req->peer_device;
2128 int err;
2130 err = drbd_send_ack(peer_device, ack, peer_req);
2131 dec_unacked(peer_device->device);
2133 return err;
2136 static int e_send_superseded(struct drbd_work *w, int unused)
2138 return e_send_ack(w, P_SUPERSEDED);
2141 static int e_send_retry_write(struct drbd_work *w, int unused)
2143 struct drbd_peer_request *peer_req =
2144 container_of(w, struct drbd_peer_request, w);
2145 struct drbd_connection *connection = peer_req->peer_device->connection;
2147 return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2148 P_RETRY_WRITE : P_SUPERSEDED);
2151 static bool seq_greater(u32 a, u32 b)
2154 * We assume 32-bit wrap-around here.
2155 * For 24-bit wrap-around, we would have to shift:
2156 * a <<= 8; b <<= 8;
2158 return (s32)a - (s32)b > 0;
2161 static u32 seq_max(u32 a, u32 b)
2163 return seq_greater(a, b) ? a : b;
2166 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2168 struct drbd_device *device = peer_device->device;
2169 unsigned int newest_peer_seq;
2171 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2172 spin_lock(&device->peer_seq_lock);
2173 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2174 device->peer_seq = newest_peer_seq;
2175 spin_unlock(&device->peer_seq_lock);
2176 /* wake up only if we actually changed device->peer_seq */
2177 if (peer_seq == newest_peer_seq)
2178 wake_up(&device->seq_wait);
2182 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2184 return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2187 /* maybe change sync_ee into interval trees as well? */
2188 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2190 struct drbd_peer_request *rs_req;
2191 bool rv = false;
2193 spin_lock_irq(&device->resource->req_lock);
2194 list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2195 if (overlaps(peer_req->i.sector, peer_req->i.size,
2196 rs_req->i.sector, rs_req->i.size)) {
2197 rv = true;
2198 break;
2201 spin_unlock_irq(&device->resource->req_lock);
2203 return rv;
2206 /* Called from receive_Data.
2207 * Synchronize packets on sock with packets on msock.
2209 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2210 * packet traveling on msock, they are still processed in the order they have
2211 * been sent.
2213 * Note: we don't care for Ack packets overtaking P_DATA packets.
2215 * In case packet_seq is larger than device->peer_seq number, there are
2216 * outstanding packets on the msock. We wait for them to arrive.
2217 * In case we are the logically next packet, we update device->peer_seq
2218 * ourselves. Correctly handles 32bit wrap around.
2220 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2221 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2222 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2223 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2225 * returns 0 if we may process the packet,
2226 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2227 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2229 struct drbd_device *device = peer_device->device;
2230 DEFINE_WAIT(wait);
2231 long timeout;
2232 int ret = 0, tp;
2234 if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2235 return 0;
2237 spin_lock(&device->peer_seq_lock);
2238 for (;;) {
2239 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2240 device->peer_seq = seq_max(device->peer_seq, peer_seq);
2241 break;
2244 if (signal_pending(current)) {
2245 ret = -ERESTARTSYS;
2246 break;
2249 rcu_read_lock();
2250 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2251 rcu_read_unlock();
2253 if (!tp)
2254 break;
2256 /* Only need to wait if two_primaries is enabled */
2257 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2258 spin_unlock(&device->peer_seq_lock);
2259 rcu_read_lock();
2260 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2261 rcu_read_unlock();
2262 timeout = schedule_timeout(timeout);
2263 spin_lock(&device->peer_seq_lock);
2264 if (!timeout) {
2265 ret = -ETIMEDOUT;
2266 drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2267 break;
2270 spin_unlock(&device->peer_seq_lock);
2271 finish_wait(&device->seq_wait, &wait);
2272 return ret;
2275 /* see also bio_flags_to_wire()
2276 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2277 * flags and back. We may replicate to other kernel versions. */
2278 static unsigned long wire_flags_to_bio_flags(u32 dpf)
2280 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2281 (dpf & DP_FUA ? REQ_FUA : 0) |
2282 (dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2285 static unsigned long wire_flags_to_bio_op(u32 dpf)
2287 if (dpf & DP_DISCARD)
2288 return REQ_OP_WRITE_ZEROES;
2289 else
2290 return REQ_OP_WRITE;
2293 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2294 unsigned int size)
2296 struct drbd_interval *i;
2298 repeat:
2299 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2300 struct drbd_request *req;
2301 struct bio_and_error m;
2303 if (!i->local)
2304 continue;
2305 req = container_of(i, struct drbd_request, i);
2306 if (!(req->rq_state & RQ_POSTPONED))
2307 continue;
2308 req->rq_state &= ~RQ_POSTPONED;
2309 __req_mod(req, NEG_ACKED, &m);
2310 spin_unlock_irq(&device->resource->req_lock);
2311 if (m.bio)
2312 complete_master_bio(device, &m);
2313 spin_lock_irq(&device->resource->req_lock);
2314 goto repeat;
2318 static int handle_write_conflicts(struct drbd_device *device,
2319 struct drbd_peer_request *peer_req)
2321 struct drbd_connection *connection = peer_req->peer_device->connection;
2322 bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2323 sector_t sector = peer_req->i.sector;
2324 const unsigned int size = peer_req->i.size;
2325 struct drbd_interval *i;
2326 bool equal;
2327 int err;
2330 * Inserting the peer request into the write_requests tree will prevent
2331 * new conflicting local requests from being added.
2333 drbd_insert_interval(&device->write_requests, &peer_req->i);
2335 repeat:
2336 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2337 if (i == &peer_req->i)
2338 continue;
2339 if (i->completed)
2340 continue;
2342 if (!i->local) {
2344 * Our peer has sent a conflicting remote request; this
2345 * should not happen in a two-node setup. Wait for the
2346 * earlier peer request to complete.
2348 err = drbd_wait_misc(device, i);
2349 if (err)
2350 goto out;
2351 goto repeat;
2354 equal = i->sector == sector && i->size == size;
2355 if (resolve_conflicts) {
2357 * If the peer request is fully contained within the
2358 * overlapping request, it can be considered overwritten
2359 * and thus superseded; otherwise, it will be retried
2360 * once all overlapping requests have completed.
2362 bool superseded = i->sector <= sector && i->sector +
2363 (i->size >> 9) >= sector + (size >> 9);
2365 if (!equal)
2366 drbd_alert(device, "Concurrent writes detected: "
2367 "local=%llus +%u, remote=%llus +%u, "
2368 "assuming %s came first\n",
2369 (unsigned long long)i->sector, i->size,
2370 (unsigned long long)sector, size,
2371 superseded ? "local" : "remote");
2373 peer_req->w.cb = superseded ? e_send_superseded :
2374 e_send_retry_write;
2375 list_add_tail(&peer_req->w.list, &device->done_ee);
2376 queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2378 err = -ENOENT;
2379 goto out;
2380 } else {
2381 struct drbd_request *req =
2382 container_of(i, struct drbd_request, i);
2384 if (!equal)
2385 drbd_alert(device, "Concurrent writes detected: "
2386 "local=%llus +%u, remote=%llus +%u\n",
2387 (unsigned long long)i->sector, i->size,
2388 (unsigned long long)sector, size);
2390 if (req->rq_state & RQ_LOCAL_PENDING ||
2391 !(req->rq_state & RQ_POSTPONED)) {
2393 * Wait for the node with the discard flag to
2394 * decide if this request has been superseded
2395 * or needs to be retried.
2396 * Requests that have been superseded will
2397 * disappear from the write_requests tree.
2399 * In addition, wait for the conflicting
2400 * request to finish locally before submitting
2401 * the conflicting peer request.
2403 err = drbd_wait_misc(device, &req->i);
2404 if (err) {
2405 _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2406 fail_postponed_requests(device, sector, size);
2407 goto out;
2409 goto repeat;
2412 * Remember to restart the conflicting requests after
2413 * the new peer request has completed.
2415 peer_req->flags |= EE_RESTART_REQUESTS;
2418 err = 0;
2420 out:
2421 if (err)
2422 drbd_remove_epoch_entry_interval(device, peer_req);
2423 return err;
2426 /* mirrored write */
2427 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2429 struct drbd_peer_device *peer_device;
2430 struct drbd_device *device;
2431 struct net_conf *nc;
2432 sector_t sector;
2433 struct drbd_peer_request *peer_req;
2434 struct p_data *p = pi->data;
2435 u32 peer_seq = be32_to_cpu(p->seq_num);
2436 int op, op_flags;
2437 u32 dp_flags;
2438 int err, tp;
2440 peer_device = conn_peer_device(connection, pi->vnr);
2441 if (!peer_device)
2442 return -EIO;
2443 device = peer_device->device;
2445 if (!get_ldev(device)) {
2446 int err2;
2448 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2449 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2450 atomic_inc(&connection->current_epoch->epoch_size);
2451 err2 = drbd_drain_block(peer_device, pi->size);
2452 if (!err)
2453 err = err2;
2454 return err;
2458 * Corresponding put_ldev done either below (on various errors), or in
2459 * drbd_peer_request_endio, if we successfully submit the data at the
2460 * end of this function.
2463 sector = be64_to_cpu(p->sector);
2464 peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2465 if (!peer_req) {
2466 put_ldev(device);
2467 return -EIO;
2470 peer_req->w.cb = e_end_block;
2471 peer_req->submit_jif = jiffies;
2472 peer_req->flags |= EE_APPLICATION;
2474 dp_flags = be32_to_cpu(p->dp_flags);
2475 op = wire_flags_to_bio_op(dp_flags);
2476 op_flags = wire_flags_to_bio_flags(dp_flags);
2477 if (pi->cmd == P_TRIM) {
2478 D_ASSERT(peer_device, peer_req->i.size > 0);
2479 D_ASSERT(peer_device, op == REQ_OP_WRITE_ZEROES);
2480 D_ASSERT(peer_device, peer_req->pages == NULL);
2481 } else if (peer_req->pages == NULL) {
2482 D_ASSERT(device, peer_req->i.size == 0);
2483 D_ASSERT(device, dp_flags & DP_FLUSH);
2486 if (dp_flags & DP_MAY_SET_IN_SYNC)
2487 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2489 spin_lock(&connection->epoch_lock);
2490 peer_req->epoch = connection->current_epoch;
2491 atomic_inc(&peer_req->epoch->epoch_size);
2492 atomic_inc(&peer_req->epoch->active);
2493 spin_unlock(&connection->epoch_lock);
2495 rcu_read_lock();
2496 nc = rcu_dereference(peer_device->connection->net_conf);
2497 tp = nc->two_primaries;
2498 if (peer_device->connection->agreed_pro_version < 100) {
2499 switch (nc->wire_protocol) {
2500 case DRBD_PROT_C:
2501 dp_flags |= DP_SEND_WRITE_ACK;
2502 break;
2503 case DRBD_PROT_B:
2504 dp_flags |= DP_SEND_RECEIVE_ACK;
2505 break;
2508 rcu_read_unlock();
2510 if (dp_flags & DP_SEND_WRITE_ACK) {
2511 peer_req->flags |= EE_SEND_WRITE_ACK;
2512 inc_unacked(device);
2513 /* corresponding dec_unacked() in e_end_block()
2514 * respective _drbd_clear_done_ee */
2517 if (dp_flags & DP_SEND_RECEIVE_ACK) {
2518 /* I really don't like it that the receiver thread
2519 * sends on the msock, but anyways */
2520 drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2523 if (tp) {
2524 /* two primaries implies protocol C */
2525 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2526 peer_req->flags |= EE_IN_INTERVAL_TREE;
2527 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2528 if (err)
2529 goto out_interrupted;
2530 spin_lock_irq(&device->resource->req_lock);
2531 err = handle_write_conflicts(device, peer_req);
2532 if (err) {
2533 spin_unlock_irq(&device->resource->req_lock);
2534 if (err == -ENOENT) {
2535 put_ldev(device);
2536 return 0;
2538 goto out_interrupted;
2540 } else {
2541 update_peer_seq(peer_device, peer_seq);
2542 spin_lock_irq(&device->resource->req_lock);
2544 /* TRIM and WRITE_SAME are processed synchronously,
2545 * we wait for all pending requests, respectively wait for
2546 * active_ee to become empty in drbd_submit_peer_request();
2547 * better not add ourselves here. */
2548 if ((peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) == 0)
2549 list_add_tail(&peer_req->w.list, &device->active_ee);
2550 spin_unlock_irq(&device->resource->req_lock);
2552 if (device->state.conn == C_SYNC_TARGET)
2553 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2555 if (device->state.pdsk < D_INCONSISTENT) {
2556 /* In case we have the only disk of the cluster, */
2557 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2558 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2559 drbd_al_begin_io(device, &peer_req->i);
2560 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2563 err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2564 DRBD_FAULT_DT_WR);
2565 if (!err)
2566 return 0;
2568 /* don't care for the reason here */
2569 drbd_err(device, "submit failed, triggering re-connect\n");
2570 spin_lock_irq(&device->resource->req_lock);
2571 list_del(&peer_req->w.list);
2572 drbd_remove_epoch_entry_interval(device, peer_req);
2573 spin_unlock_irq(&device->resource->req_lock);
2574 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2575 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2576 drbd_al_complete_io(device, &peer_req->i);
2579 out_interrupted:
2580 drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2581 put_ldev(device);
2582 drbd_free_peer_req(device, peer_req);
2583 return err;
2586 /* We may throttle resync, if the lower device seems to be busy,
2587 * and current sync rate is above c_min_rate.
2589 * To decide whether or not the lower device is busy, we use a scheme similar
2590 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2591 * (more than 64 sectors) of activity we cannot account for with our own resync
2592 * activity, it obviously is "busy".
2594 * The current sync rate used here uses only the most recent two step marks,
2595 * to have a short time average so we can react faster.
2597 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2598 bool throttle_if_app_is_waiting)
2600 struct lc_element *tmp;
2601 bool throttle = drbd_rs_c_min_rate_throttle(device);
2603 if (!throttle || throttle_if_app_is_waiting)
2604 return throttle;
2606 spin_lock_irq(&device->al_lock);
2607 tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2608 if (tmp) {
2609 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2610 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2611 throttle = false;
2612 /* Do not slow down if app IO is already waiting for this extent,
2613 * and our progress is necessary for application IO to complete. */
2615 spin_unlock_irq(&device->al_lock);
2617 return throttle;
2620 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2622 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2623 unsigned long db, dt, dbdt;
2624 unsigned int c_min_rate;
2625 int curr_events;
2627 rcu_read_lock();
2628 c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2629 rcu_read_unlock();
2631 /* feature disabled? */
2632 if (c_min_rate == 0)
2633 return false;
2635 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2636 (int)part_stat_read(&disk->part0, sectors[1]) -
2637 atomic_read(&device->rs_sect_ev);
2639 if (atomic_read(&device->ap_actlog_cnt)
2640 || curr_events - device->rs_last_events > 64) {
2641 unsigned long rs_left;
2642 int i;
2644 device->rs_last_events = curr_events;
2646 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2647 * approx. */
2648 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2650 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2651 rs_left = device->ov_left;
2652 else
2653 rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2655 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2656 if (!dt)
2657 dt++;
2658 db = device->rs_mark_left[i] - rs_left;
2659 dbdt = Bit2KB(db/dt);
2661 if (dbdt > c_min_rate)
2662 return true;
2664 return false;
2667 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2669 struct drbd_peer_device *peer_device;
2670 struct drbd_device *device;
2671 sector_t sector;
2672 sector_t capacity;
2673 struct drbd_peer_request *peer_req;
2674 struct digest_info *di = NULL;
2675 int size, verb;
2676 unsigned int fault_type;
2677 struct p_block_req *p = pi->data;
2679 peer_device = conn_peer_device(connection, pi->vnr);
2680 if (!peer_device)
2681 return -EIO;
2682 device = peer_device->device;
2683 capacity = drbd_get_capacity(device->this_bdev);
2685 sector = be64_to_cpu(p->sector);
2686 size = be32_to_cpu(p->blksize);
2688 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2689 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2690 (unsigned long long)sector, size);
2691 return -EINVAL;
2693 if (sector + (size>>9) > capacity) {
2694 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2695 (unsigned long long)sector, size);
2696 return -EINVAL;
2699 if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2700 verb = 1;
2701 switch (pi->cmd) {
2702 case P_DATA_REQUEST:
2703 drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2704 break;
2705 case P_RS_THIN_REQ:
2706 case P_RS_DATA_REQUEST:
2707 case P_CSUM_RS_REQUEST:
2708 case P_OV_REQUEST:
2709 drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2710 break;
2711 case P_OV_REPLY:
2712 verb = 0;
2713 dec_rs_pending(device);
2714 drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2715 break;
2716 default:
2717 BUG();
2719 if (verb && __ratelimit(&drbd_ratelimit_state))
2720 drbd_err(device, "Can not satisfy peer's read request, "
2721 "no local data.\n");
2723 /* drain possibly payload */
2724 return drbd_drain_block(peer_device, pi->size);
2727 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2728 * "criss-cross" setup, that might cause write-out on some other DRBD,
2729 * which in turn might block on the other node at this very place. */
2730 peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2731 size, GFP_NOIO);
2732 if (!peer_req) {
2733 put_ldev(device);
2734 return -ENOMEM;
2737 switch (pi->cmd) {
2738 case P_DATA_REQUEST:
2739 peer_req->w.cb = w_e_end_data_req;
2740 fault_type = DRBD_FAULT_DT_RD;
2741 /* application IO, don't drbd_rs_begin_io */
2742 peer_req->flags |= EE_APPLICATION;
2743 goto submit;
2745 case P_RS_THIN_REQ:
2746 /* If at some point in the future we have a smart way to
2747 find out if this data block is completely deallocated,
2748 then we would do something smarter here than reading
2749 the block... */
2750 peer_req->flags |= EE_RS_THIN_REQ;
2751 case P_RS_DATA_REQUEST:
2752 peer_req->w.cb = w_e_end_rsdata_req;
2753 fault_type = DRBD_FAULT_RS_RD;
2754 /* used in the sector offset progress display */
2755 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2756 break;
2758 case P_OV_REPLY:
2759 case P_CSUM_RS_REQUEST:
2760 fault_type = DRBD_FAULT_RS_RD;
2761 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2762 if (!di)
2763 goto out_free_e;
2765 di->digest_size = pi->size;
2766 di->digest = (((char *)di)+sizeof(struct digest_info));
2768 peer_req->digest = di;
2769 peer_req->flags |= EE_HAS_DIGEST;
2771 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2772 goto out_free_e;
2774 if (pi->cmd == P_CSUM_RS_REQUEST) {
2775 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2776 peer_req->w.cb = w_e_end_csum_rs_req;
2777 /* used in the sector offset progress display */
2778 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2779 /* remember to report stats in drbd_resync_finished */
2780 device->use_csums = true;
2781 } else if (pi->cmd == P_OV_REPLY) {
2782 /* track progress, we may need to throttle */
2783 atomic_add(size >> 9, &device->rs_sect_in);
2784 peer_req->w.cb = w_e_end_ov_reply;
2785 dec_rs_pending(device);
2786 /* drbd_rs_begin_io done when we sent this request,
2787 * but accounting still needs to be done. */
2788 goto submit_for_resync;
2790 break;
2792 case P_OV_REQUEST:
2793 if (device->ov_start_sector == ~(sector_t)0 &&
2794 peer_device->connection->agreed_pro_version >= 90) {
2795 unsigned long now = jiffies;
2796 int i;
2797 device->ov_start_sector = sector;
2798 device->ov_position = sector;
2799 device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2800 device->rs_total = device->ov_left;
2801 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2802 device->rs_mark_left[i] = device->ov_left;
2803 device->rs_mark_time[i] = now;
2805 drbd_info(device, "Online Verify start sector: %llu\n",
2806 (unsigned long long)sector);
2808 peer_req->w.cb = w_e_end_ov_req;
2809 fault_type = DRBD_FAULT_RS_RD;
2810 break;
2812 default:
2813 BUG();
2816 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2817 * wrt the receiver, but it is not as straightforward as it may seem.
2818 * Various places in the resync start and stop logic assume resync
2819 * requests are processed in order, requeuing this on the worker thread
2820 * introduces a bunch of new code for synchronization between threads.
2822 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2823 * "forever", throttling after drbd_rs_begin_io will lock that extent
2824 * for application writes for the same time. For now, just throttle
2825 * here, where the rest of the code expects the receiver to sleep for
2826 * a while, anyways.
2829 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2830 * this defers syncer requests for some time, before letting at least
2831 * on request through. The resync controller on the receiving side
2832 * will adapt to the incoming rate accordingly.
2834 * We cannot throttle here if remote is Primary/SyncTarget:
2835 * we would also throttle its application reads.
2836 * In that case, throttling is done on the SyncTarget only.
2839 /* Even though this may be a resync request, we do add to "read_ee";
2840 * "sync_ee" is only used for resync WRITEs.
2841 * Add to list early, so debugfs can find this request
2842 * even if we have to sleep below. */
2843 spin_lock_irq(&device->resource->req_lock);
2844 list_add_tail(&peer_req->w.list, &device->read_ee);
2845 spin_unlock_irq(&device->resource->req_lock);
2847 update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2848 if (device->state.peer != R_PRIMARY
2849 && drbd_rs_should_slow_down(device, sector, false))
2850 schedule_timeout_uninterruptible(HZ/10);
2851 update_receiver_timing_details(connection, drbd_rs_begin_io);
2852 if (drbd_rs_begin_io(device, sector))
2853 goto out_free_e;
2855 submit_for_resync:
2856 atomic_add(size >> 9, &device->rs_sect_ev);
2858 submit:
2859 update_receiver_timing_details(connection, drbd_submit_peer_request);
2860 inc_unacked(device);
2861 if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
2862 fault_type) == 0)
2863 return 0;
2865 /* don't care for the reason here */
2866 drbd_err(device, "submit failed, triggering re-connect\n");
2868 out_free_e:
2869 spin_lock_irq(&device->resource->req_lock);
2870 list_del(&peer_req->w.list);
2871 spin_unlock_irq(&device->resource->req_lock);
2872 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2874 put_ldev(device);
2875 drbd_free_peer_req(device, peer_req);
2876 return -EIO;
2880 * drbd_asb_recover_0p - Recover after split-brain with no remaining primaries
2882 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2884 struct drbd_device *device = peer_device->device;
2885 int self, peer, rv = -100;
2886 unsigned long ch_self, ch_peer;
2887 enum drbd_after_sb_p after_sb_0p;
2889 self = device->ldev->md.uuid[UI_BITMAP] & 1;
2890 peer = device->p_uuid[UI_BITMAP] & 1;
2892 ch_peer = device->p_uuid[UI_SIZE];
2893 ch_self = device->comm_bm_set;
2895 rcu_read_lock();
2896 after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2897 rcu_read_unlock();
2898 switch (after_sb_0p) {
2899 case ASB_CONSENSUS:
2900 case ASB_DISCARD_SECONDARY:
2901 case ASB_CALL_HELPER:
2902 case ASB_VIOLENTLY:
2903 drbd_err(device, "Configuration error.\n");
2904 break;
2905 case ASB_DISCONNECT:
2906 break;
2907 case ASB_DISCARD_YOUNGER_PRI:
2908 if (self == 0 && peer == 1) {
2909 rv = -1;
2910 break;
2912 if (self == 1 && peer == 0) {
2913 rv = 1;
2914 break;
2916 /* Else fall through to one of the other strategies... */
2917 case ASB_DISCARD_OLDER_PRI:
2918 if (self == 0 && peer == 1) {
2919 rv = 1;
2920 break;
2922 if (self == 1 && peer == 0) {
2923 rv = -1;
2924 break;
2926 /* Else fall through to one of the other strategies... */
2927 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2928 "Using discard-least-changes instead\n");
2929 case ASB_DISCARD_ZERO_CHG:
2930 if (ch_peer == 0 && ch_self == 0) {
2931 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2932 ? -1 : 1;
2933 break;
2934 } else {
2935 if (ch_peer == 0) { rv = 1; break; }
2936 if (ch_self == 0) { rv = -1; break; }
2938 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2939 break;
2940 case ASB_DISCARD_LEAST_CHG:
2941 if (ch_self < ch_peer)
2942 rv = -1;
2943 else if (ch_self > ch_peer)
2944 rv = 1;
2945 else /* ( ch_self == ch_peer ) */
2946 /* Well, then use something else. */
2947 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2948 ? -1 : 1;
2949 break;
2950 case ASB_DISCARD_LOCAL:
2951 rv = -1;
2952 break;
2953 case ASB_DISCARD_REMOTE:
2954 rv = 1;
2957 return rv;
2961 * drbd_asb_recover_1p - Recover after split-brain with one remaining primary
2963 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2965 struct drbd_device *device = peer_device->device;
2966 int hg, rv = -100;
2967 enum drbd_after_sb_p after_sb_1p;
2969 rcu_read_lock();
2970 after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2971 rcu_read_unlock();
2972 switch (after_sb_1p) {
2973 case ASB_DISCARD_YOUNGER_PRI:
2974 case ASB_DISCARD_OLDER_PRI:
2975 case ASB_DISCARD_LEAST_CHG:
2976 case ASB_DISCARD_LOCAL:
2977 case ASB_DISCARD_REMOTE:
2978 case ASB_DISCARD_ZERO_CHG:
2979 drbd_err(device, "Configuration error.\n");
2980 break;
2981 case ASB_DISCONNECT:
2982 break;
2983 case ASB_CONSENSUS:
2984 hg = drbd_asb_recover_0p(peer_device);
2985 if (hg == -1 && device->state.role == R_SECONDARY)
2986 rv = hg;
2987 if (hg == 1 && device->state.role == R_PRIMARY)
2988 rv = hg;
2989 break;
2990 case ASB_VIOLENTLY:
2991 rv = drbd_asb_recover_0p(peer_device);
2992 break;
2993 case ASB_DISCARD_SECONDARY:
2994 return device->state.role == R_PRIMARY ? 1 : -1;
2995 case ASB_CALL_HELPER:
2996 hg = drbd_asb_recover_0p(peer_device);
2997 if (hg == -1 && device->state.role == R_PRIMARY) {
2998 enum drbd_state_rv rv2;
3000 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3001 * we might be here in C_WF_REPORT_PARAMS which is transient.
3002 * we do not need to wait for the after state change work either. */
3003 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3004 if (rv2 != SS_SUCCESS) {
3005 drbd_khelper(device, "pri-lost-after-sb");
3006 } else {
3007 drbd_warn(device, "Successfully gave up primary role.\n");
3008 rv = hg;
3010 } else
3011 rv = hg;
3014 return rv;
3018 * drbd_asb_recover_2p - Recover after split-brain with two remaining primaries
3020 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3022 struct drbd_device *device = peer_device->device;
3023 int hg, rv = -100;
3024 enum drbd_after_sb_p after_sb_2p;
3026 rcu_read_lock();
3027 after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3028 rcu_read_unlock();
3029 switch (after_sb_2p) {
3030 case ASB_DISCARD_YOUNGER_PRI:
3031 case ASB_DISCARD_OLDER_PRI:
3032 case ASB_DISCARD_LEAST_CHG:
3033 case ASB_DISCARD_LOCAL:
3034 case ASB_DISCARD_REMOTE:
3035 case ASB_CONSENSUS:
3036 case ASB_DISCARD_SECONDARY:
3037 case ASB_DISCARD_ZERO_CHG:
3038 drbd_err(device, "Configuration error.\n");
3039 break;
3040 case ASB_VIOLENTLY:
3041 rv = drbd_asb_recover_0p(peer_device);
3042 break;
3043 case ASB_DISCONNECT:
3044 break;
3045 case ASB_CALL_HELPER:
3046 hg = drbd_asb_recover_0p(peer_device);
3047 if (hg == -1) {
3048 enum drbd_state_rv rv2;
3050 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3051 * we might be here in C_WF_REPORT_PARAMS which is transient.
3052 * we do not need to wait for the after state change work either. */
3053 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3054 if (rv2 != SS_SUCCESS) {
3055 drbd_khelper(device, "pri-lost-after-sb");
3056 } else {
3057 drbd_warn(device, "Successfully gave up primary role.\n");
3058 rv = hg;
3060 } else
3061 rv = hg;
3064 return rv;
3067 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3068 u64 bits, u64 flags)
3070 if (!uuid) {
3071 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3072 return;
3074 drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3075 text,
3076 (unsigned long long)uuid[UI_CURRENT],
3077 (unsigned long long)uuid[UI_BITMAP],
3078 (unsigned long long)uuid[UI_HISTORY_START],
3079 (unsigned long long)uuid[UI_HISTORY_END],
3080 (unsigned long long)bits,
3081 (unsigned long long)flags);
3085 100 after split brain try auto recover
3086 2 C_SYNC_SOURCE set BitMap
3087 1 C_SYNC_SOURCE use BitMap
3088 0 no Sync
3089 -1 C_SYNC_TARGET use BitMap
3090 -2 C_SYNC_TARGET set BitMap
3091 -100 after split brain, disconnect
3092 -1000 unrelated data
3093 -1091 requires proto 91
3094 -1096 requires proto 96
3097 static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3099 struct drbd_peer_device *const peer_device = first_peer_device(device);
3100 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
3101 u64 self, peer;
3102 int i, j;
3104 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3105 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3107 *rule_nr = 10;
3108 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3109 return 0;
3111 *rule_nr = 20;
3112 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3113 peer != UUID_JUST_CREATED)
3114 return -2;
3116 *rule_nr = 30;
3117 if (self != UUID_JUST_CREATED &&
3118 (peer == UUID_JUST_CREATED || peer == (u64)0))
3119 return 2;
3121 if (self == peer) {
3122 int rct, dc; /* roles at crash time */
3124 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3126 if (connection->agreed_pro_version < 91)
3127 return -1091;
3129 if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3130 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3131 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3132 drbd_uuid_move_history(device);
3133 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3134 device->ldev->md.uuid[UI_BITMAP] = 0;
3136 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3137 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3138 *rule_nr = 34;
3139 } else {
3140 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3141 *rule_nr = 36;
3144 return 1;
3147 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3149 if (connection->agreed_pro_version < 91)
3150 return -1091;
3152 if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3153 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3154 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3156 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3157 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3158 device->p_uuid[UI_BITMAP] = 0UL;
3160 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3161 *rule_nr = 35;
3162 } else {
3163 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3164 *rule_nr = 37;
3167 return -1;
3170 /* Common power [off|failure] */
3171 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3172 (device->p_uuid[UI_FLAGS] & 2);
3173 /* lowest bit is set when we were primary,
3174 * next bit (weight 2) is set when peer was primary */
3175 *rule_nr = 40;
3177 /* Neither has the "crashed primary" flag set,
3178 * only a replication link hickup. */
3179 if (rct == 0)
3180 return 0;
3182 /* Current UUID equal and no bitmap uuid; does not necessarily
3183 * mean this was a "simultaneous hard crash", maybe IO was
3184 * frozen, so no UUID-bump happened.
3185 * This is a protocol change, overload DRBD_FF_WSAME as flag
3186 * for "new-enough" peer DRBD version. */
3187 if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3188 *rule_nr = 41;
3189 if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3190 drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3191 return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3193 if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3194 /* At least one has the "crashed primary" bit set,
3195 * both are primary now, but neither has rotated its UUIDs?
3196 * "Can not happen." */
3197 drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3198 return -100;
3200 if (device->state.role == R_PRIMARY)
3201 return 1;
3202 return -1;
3205 /* Both are secondary.
3206 * Really looks like recovery from simultaneous hard crash.
3207 * Check which had been primary before, and arbitrate. */
3208 switch (rct) {
3209 case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3210 case 1: /* self_pri && !peer_pri */ return 1;
3211 case 2: /* !self_pri && peer_pri */ return -1;
3212 case 3: /* self_pri && peer_pri */
3213 dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3214 return dc ? -1 : 1;
3218 *rule_nr = 50;
3219 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3220 if (self == peer)
3221 return -1;
3223 *rule_nr = 51;
3224 peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3225 if (self == peer) {
3226 if (connection->agreed_pro_version < 96 ?
3227 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3228 (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3229 peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3230 /* The last P_SYNC_UUID did not get though. Undo the last start of
3231 resync as sync source modifications of the peer's UUIDs. */
3233 if (connection->agreed_pro_version < 91)
3234 return -1091;
3236 device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3237 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3239 drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3240 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3242 return -1;
3246 *rule_nr = 60;
3247 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3248 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3249 peer = device->p_uuid[i] & ~((u64)1);
3250 if (self == peer)
3251 return -2;
3254 *rule_nr = 70;
3255 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3256 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3257 if (self == peer)
3258 return 1;
3260 *rule_nr = 71;
3261 self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3262 if (self == peer) {
3263 if (connection->agreed_pro_version < 96 ?
3264 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3265 (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3266 self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3267 /* The last P_SYNC_UUID did not get though. Undo the last start of
3268 resync as sync source modifications of our UUIDs. */
3270 if (connection->agreed_pro_version < 91)
3271 return -1091;
3273 __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3274 __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3276 drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3277 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3278 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3280 return 1;
3285 *rule_nr = 80;
3286 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3287 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3288 self = device->ldev->md.uuid[i] & ~((u64)1);
3289 if (self == peer)
3290 return 2;
3293 *rule_nr = 90;
3294 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3295 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3296 if (self == peer && self != ((u64)0))
3297 return 100;
3299 *rule_nr = 100;
3300 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3301 self = device->ldev->md.uuid[i] & ~((u64)1);
3302 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3303 peer = device->p_uuid[j] & ~((u64)1);
3304 if (self == peer)
3305 return -100;
3309 return -1000;
3312 /* drbd_sync_handshake() returns the new conn state on success, or
3313 CONN_MASK (-1) on failure.
3315 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3316 enum drbd_role peer_role,
3317 enum drbd_disk_state peer_disk) __must_hold(local)
3319 struct drbd_device *device = peer_device->device;
3320 enum drbd_conns rv = C_MASK;
3321 enum drbd_disk_state mydisk;
3322 struct net_conf *nc;
3323 int hg, rule_nr, rr_conflict, tentative;
3325 mydisk = device->state.disk;
3326 if (mydisk == D_NEGOTIATING)
3327 mydisk = device->new_state_tmp.disk;
3329 drbd_info(device, "drbd_sync_handshake:\n");
3331 spin_lock_irq(&device->ldev->md.uuid_lock);
3332 drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3333 drbd_uuid_dump(device, "peer", device->p_uuid,
3334 device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3336 hg = drbd_uuid_compare(device, peer_role, &rule_nr);
3337 spin_unlock_irq(&device->ldev->md.uuid_lock);
3339 drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3341 if (hg == -1000) {
3342 drbd_alert(device, "Unrelated data, aborting!\n");
3343 return C_MASK;
3345 if (hg < -0x10000) {
3346 int proto, fflags;
3347 hg = -hg;
3348 proto = hg & 0xff;
3349 fflags = (hg >> 8) & 0xff;
3350 drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3351 proto, fflags);
3352 return C_MASK;
3354 if (hg < -1000) {
3355 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3356 return C_MASK;
3359 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3360 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
3361 int f = (hg == -100) || abs(hg) == 2;
3362 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3363 if (f)
3364 hg = hg*2;
3365 drbd_info(device, "Becoming sync %s due to disk states.\n",
3366 hg > 0 ? "source" : "target");
3369 if (abs(hg) == 100)
3370 drbd_khelper(device, "initial-split-brain");
3372 rcu_read_lock();
3373 nc = rcu_dereference(peer_device->connection->net_conf);
3375 if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3376 int pcount = (device->state.role == R_PRIMARY)
3377 + (peer_role == R_PRIMARY);
3378 int forced = (hg == -100);
3380 switch (pcount) {
3381 case 0:
3382 hg = drbd_asb_recover_0p(peer_device);
3383 break;
3384 case 1:
3385 hg = drbd_asb_recover_1p(peer_device);
3386 break;
3387 case 2:
3388 hg = drbd_asb_recover_2p(peer_device);
3389 break;
3391 if (abs(hg) < 100) {
3392 drbd_warn(device, "Split-Brain detected, %d primaries, "
3393 "automatically solved. Sync from %s node\n",
3394 pcount, (hg < 0) ? "peer" : "this");
3395 if (forced) {
3396 drbd_warn(device, "Doing a full sync, since"
3397 " UUIDs where ambiguous.\n");
3398 hg = hg*2;
3403 if (hg == -100) {
3404 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3405 hg = -1;
3406 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3407 hg = 1;
3409 if (abs(hg) < 100)
3410 drbd_warn(device, "Split-Brain detected, manually solved. "
3411 "Sync from %s node\n",
3412 (hg < 0) ? "peer" : "this");
3414 rr_conflict = nc->rr_conflict;
3415 tentative = nc->tentative;
3416 rcu_read_unlock();
3418 if (hg == -100) {
3419 /* FIXME this log message is not correct if we end up here
3420 * after an attempted attach on a diskless node.
3421 * We just refuse to attach -- well, we drop the "connection"
3422 * to that disk, in a way... */
3423 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3424 drbd_khelper(device, "split-brain");
3425 return C_MASK;
3428 if (hg > 0 && mydisk <= D_INCONSISTENT) {
3429 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3430 return C_MASK;
3433 if (hg < 0 && /* by intention we do not use mydisk here. */
3434 device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3435 switch (rr_conflict) {
3436 case ASB_CALL_HELPER:
3437 drbd_khelper(device, "pri-lost");
3438 /* fall through */
3439 case ASB_DISCONNECT:
3440 drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3441 return C_MASK;
3442 case ASB_VIOLENTLY:
3443 drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3444 "assumption\n");
3448 if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3449 if (hg == 0)
3450 drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3451 else
3452 drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3453 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3454 abs(hg) >= 2 ? "full" : "bit-map based");
3455 return C_MASK;
3458 if (abs(hg) >= 2) {
3459 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3460 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3461 BM_LOCKED_SET_ALLOWED))
3462 return C_MASK;
3465 if (hg > 0) { /* become sync source. */
3466 rv = C_WF_BITMAP_S;
3467 } else if (hg < 0) { /* become sync target */
3468 rv = C_WF_BITMAP_T;
3469 } else {
3470 rv = C_CONNECTED;
3471 if (drbd_bm_total_weight(device)) {
3472 drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3473 drbd_bm_total_weight(device));
3477 return rv;
3480 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3482 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3483 if (peer == ASB_DISCARD_REMOTE)
3484 return ASB_DISCARD_LOCAL;
3486 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3487 if (peer == ASB_DISCARD_LOCAL)
3488 return ASB_DISCARD_REMOTE;
3490 /* everything else is valid if they are equal on both sides. */
3491 return peer;
3494 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3496 struct p_protocol *p = pi->data;
3497 enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3498 int p_proto, p_discard_my_data, p_two_primaries, cf;
3499 struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3500 char integrity_alg[SHARED_SECRET_MAX] = "";
3501 struct crypto_ahash *peer_integrity_tfm = NULL;
3502 void *int_dig_in = NULL, *int_dig_vv = NULL;
3504 p_proto = be32_to_cpu(p->protocol);
3505 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
3506 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
3507 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
3508 p_two_primaries = be32_to_cpu(p->two_primaries);
3509 cf = be32_to_cpu(p->conn_flags);
3510 p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3512 if (connection->agreed_pro_version >= 87) {
3513 int err;
3515 if (pi->size > sizeof(integrity_alg))
3516 return -EIO;
3517 err = drbd_recv_all(connection, integrity_alg, pi->size);
3518 if (err)
3519 return err;
3520 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3523 if (pi->cmd != P_PROTOCOL_UPDATE) {
3524 clear_bit(CONN_DRY_RUN, &connection->flags);
3526 if (cf & CF_DRY_RUN)
3527 set_bit(CONN_DRY_RUN, &connection->flags);
3529 rcu_read_lock();
3530 nc = rcu_dereference(connection->net_conf);
3532 if (p_proto != nc->wire_protocol) {
3533 drbd_err(connection, "incompatible %s settings\n", "protocol");
3534 goto disconnect_rcu_unlock;
3537 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3538 drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3539 goto disconnect_rcu_unlock;
3542 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3543 drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3544 goto disconnect_rcu_unlock;
3547 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3548 drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3549 goto disconnect_rcu_unlock;
3552 if (p_discard_my_data && nc->discard_my_data) {
3553 drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3554 goto disconnect_rcu_unlock;
3557 if (p_two_primaries != nc->two_primaries) {
3558 drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3559 goto disconnect_rcu_unlock;
3562 if (strcmp(integrity_alg, nc->integrity_alg)) {
3563 drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3564 goto disconnect_rcu_unlock;
3567 rcu_read_unlock();
3570 if (integrity_alg[0]) {
3571 int hash_size;
3574 * We can only change the peer data integrity algorithm
3575 * here. Changing our own data integrity algorithm
3576 * requires that we send a P_PROTOCOL_UPDATE packet at
3577 * the same time; otherwise, the peer has no way to
3578 * tell between which packets the algorithm should
3579 * change.
3582 peer_integrity_tfm = crypto_alloc_ahash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3583 if (IS_ERR(peer_integrity_tfm)) {
3584 peer_integrity_tfm = NULL;
3585 drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3586 integrity_alg);
3587 goto disconnect;
3590 hash_size = crypto_ahash_digestsize(peer_integrity_tfm);
3591 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3592 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3593 if (!(int_dig_in && int_dig_vv)) {
3594 drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3595 goto disconnect;
3599 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3600 if (!new_net_conf) {
3601 drbd_err(connection, "Allocation of new net_conf failed\n");
3602 goto disconnect;
3605 mutex_lock(&connection->data.mutex);
3606 mutex_lock(&connection->resource->conf_update);
3607 old_net_conf = connection->net_conf;
3608 *new_net_conf = *old_net_conf;
3610 new_net_conf->wire_protocol = p_proto;
3611 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3612 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3613 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3614 new_net_conf->two_primaries = p_two_primaries;
3616 rcu_assign_pointer(connection->net_conf, new_net_conf);
3617 mutex_unlock(&connection->resource->conf_update);
3618 mutex_unlock(&connection->data.mutex);
3620 crypto_free_ahash(connection->peer_integrity_tfm);
3621 kfree(connection->int_dig_in);
3622 kfree(connection->int_dig_vv);
3623 connection->peer_integrity_tfm = peer_integrity_tfm;
3624 connection->int_dig_in = int_dig_in;
3625 connection->int_dig_vv = int_dig_vv;
3627 if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3628 drbd_info(connection, "peer data-integrity-alg: %s\n",
3629 integrity_alg[0] ? integrity_alg : "(none)");
3631 synchronize_rcu();
3632 kfree(old_net_conf);
3633 return 0;
3635 disconnect_rcu_unlock:
3636 rcu_read_unlock();
3637 disconnect:
3638 crypto_free_ahash(peer_integrity_tfm);
3639 kfree(int_dig_in);
3640 kfree(int_dig_vv);
3641 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3642 return -EIO;
3645 /* helper function
3646 * input: alg name, feature name
3647 * return: NULL (alg name was "")
3648 * ERR_PTR(error) if something goes wrong
3649 * or the crypto hash ptr, if it worked out ok. */
3650 static struct crypto_ahash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3651 const char *alg, const char *name)
3653 struct crypto_ahash *tfm;
3655 if (!alg[0])
3656 return NULL;
3658 tfm = crypto_alloc_ahash(alg, 0, CRYPTO_ALG_ASYNC);
3659 if (IS_ERR(tfm)) {
3660 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3661 alg, name, PTR_ERR(tfm));
3662 return tfm;
3664 return tfm;
3667 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3669 void *buffer = connection->data.rbuf;
3670 int size = pi->size;
3672 while (size) {
3673 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3674 s = drbd_recv(connection, buffer, s);
3675 if (s <= 0) {
3676 if (s < 0)
3677 return s;
3678 break;
3680 size -= s;
3682 if (size)
3683 return -EIO;
3684 return 0;
3688 * config_unknown_volume - device configuration command for unknown volume
3690 * When a device is added to an existing connection, the node on which the
3691 * device is added first will send configuration commands to its peer but the
3692 * peer will not know about the device yet. It will warn and ignore these
3693 * commands. Once the device is added on the second node, the second node will
3694 * send the same device configuration commands, but in the other direction.
3696 * (We can also end up here if drbd is misconfigured.)
3698 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3700 drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3701 cmdname(pi->cmd), pi->vnr);
3702 return ignore_remaining_packet(connection, pi);
3705 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3707 struct drbd_peer_device *peer_device;
3708 struct drbd_device *device;
3709 struct p_rs_param_95 *p;
3710 unsigned int header_size, data_size, exp_max_sz;
3711 struct crypto_ahash *verify_tfm = NULL;
3712 struct crypto_ahash *csums_tfm = NULL;
3713 struct net_conf *old_net_conf, *new_net_conf = NULL;
3714 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3715 const int apv = connection->agreed_pro_version;
3716 struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3717 int fifo_size = 0;
3718 int err;
3720 peer_device = conn_peer_device(connection, pi->vnr);
3721 if (!peer_device)
3722 return config_unknown_volume(connection, pi);
3723 device = peer_device->device;
3725 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3726 : apv == 88 ? sizeof(struct p_rs_param)
3727 + SHARED_SECRET_MAX
3728 : apv <= 94 ? sizeof(struct p_rs_param_89)
3729 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3731 if (pi->size > exp_max_sz) {
3732 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3733 pi->size, exp_max_sz);
3734 return -EIO;
3737 if (apv <= 88) {
3738 header_size = sizeof(struct p_rs_param);
3739 data_size = pi->size - header_size;
3740 } else if (apv <= 94) {
3741 header_size = sizeof(struct p_rs_param_89);
3742 data_size = pi->size - header_size;
3743 D_ASSERT(device, data_size == 0);
3744 } else {
3745 header_size = sizeof(struct p_rs_param_95);
3746 data_size = pi->size - header_size;
3747 D_ASSERT(device, data_size == 0);
3750 /* initialize verify_alg and csums_alg */
3751 p = pi->data;
3752 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3754 err = drbd_recv_all(peer_device->connection, p, header_size);
3755 if (err)
3756 return err;
3758 mutex_lock(&connection->resource->conf_update);
3759 old_net_conf = peer_device->connection->net_conf;
3760 if (get_ldev(device)) {
3761 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3762 if (!new_disk_conf) {
3763 put_ldev(device);
3764 mutex_unlock(&connection->resource->conf_update);
3765 drbd_err(device, "Allocation of new disk_conf failed\n");
3766 return -ENOMEM;
3769 old_disk_conf = device->ldev->disk_conf;
3770 *new_disk_conf = *old_disk_conf;
3772 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3775 if (apv >= 88) {
3776 if (apv == 88) {
3777 if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3778 drbd_err(device, "verify-alg of wrong size, "
3779 "peer wants %u, accepting only up to %u byte\n",
3780 data_size, SHARED_SECRET_MAX);
3781 err = -EIO;
3782 goto reconnect;
3785 err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3786 if (err)
3787 goto reconnect;
3788 /* we expect NUL terminated string */
3789 /* but just in case someone tries to be evil */
3790 D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3791 p->verify_alg[data_size-1] = 0;
3793 } else /* apv >= 89 */ {
3794 /* we still expect NUL terminated strings */
3795 /* but just in case someone tries to be evil */
3796 D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3797 D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3798 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3799 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3802 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3803 if (device->state.conn == C_WF_REPORT_PARAMS) {
3804 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3805 old_net_conf->verify_alg, p->verify_alg);
3806 goto disconnect;
3808 verify_tfm = drbd_crypto_alloc_digest_safe(device,
3809 p->verify_alg, "verify-alg");
3810 if (IS_ERR(verify_tfm)) {
3811 verify_tfm = NULL;
3812 goto disconnect;
3816 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3817 if (device->state.conn == C_WF_REPORT_PARAMS) {
3818 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3819 old_net_conf->csums_alg, p->csums_alg);
3820 goto disconnect;
3822 csums_tfm = drbd_crypto_alloc_digest_safe(device,
3823 p->csums_alg, "csums-alg");
3824 if (IS_ERR(csums_tfm)) {
3825 csums_tfm = NULL;
3826 goto disconnect;
3830 if (apv > 94 && new_disk_conf) {
3831 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3832 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3833 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3834 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3836 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3837 if (fifo_size != device->rs_plan_s->size) {
3838 new_plan = fifo_alloc(fifo_size);
3839 if (!new_plan) {
3840 drbd_err(device, "kmalloc of fifo_buffer failed");
3841 put_ldev(device);
3842 goto disconnect;
3847 if (verify_tfm || csums_tfm) {
3848 new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3849 if (!new_net_conf) {
3850 drbd_err(device, "Allocation of new net_conf failed\n");
3851 goto disconnect;
3854 *new_net_conf = *old_net_conf;
3856 if (verify_tfm) {
3857 strcpy(new_net_conf->verify_alg, p->verify_alg);
3858 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3859 crypto_free_ahash(peer_device->connection->verify_tfm);
3860 peer_device->connection->verify_tfm = verify_tfm;
3861 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3863 if (csums_tfm) {
3864 strcpy(new_net_conf->csums_alg, p->csums_alg);
3865 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3866 crypto_free_ahash(peer_device->connection->csums_tfm);
3867 peer_device->connection->csums_tfm = csums_tfm;
3868 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3870 rcu_assign_pointer(connection->net_conf, new_net_conf);
3874 if (new_disk_conf) {
3875 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3876 put_ldev(device);
3879 if (new_plan) {
3880 old_plan = device->rs_plan_s;
3881 rcu_assign_pointer(device->rs_plan_s, new_plan);
3884 mutex_unlock(&connection->resource->conf_update);
3885 synchronize_rcu();
3886 if (new_net_conf)
3887 kfree(old_net_conf);
3888 kfree(old_disk_conf);
3889 kfree(old_plan);
3891 return 0;
3893 reconnect:
3894 if (new_disk_conf) {
3895 put_ldev(device);
3896 kfree(new_disk_conf);
3898 mutex_unlock(&connection->resource->conf_update);
3899 return -EIO;
3901 disconnect:
3902 kfree(new_plan);
3903 if (new_disk_conf) {
3904 put_ldev(device);
3905 kfree(new_disk_conf);
3907 mutex_unlock(&connection->resource->conf_update);
3908 /* just for completeness: actually not needed,
3909 * as this is not reached if csums_tfm was ok. */
3910 crypto_free_ahash(csums_tfm);
3911 /* but free the verify_tfm again, if csums_tfm did not work out */
3912 crypto_free_ahash(verify_tfm);
3913 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3914 return -EIO;
3917 /* warn if the arguments differ by more than 12.5% */
3918 static void warn_if_differ_considerably(struct drbd_device *device,
3919 const char *s, sector_t a, sector_t b)
3921 sector_t d;
3922 if (a == 0 || b == 0)
3923 return;
3924 d = (a > b) ? (a - b) : (b - a);
3925 if (d > (a>>3) || d > (b>>3))
3926 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3927 (unsigned long long)a, (unsigned long long)b);
3930 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3932 struct drbd_peer_device *peer_device;
3933 struct drbd_device *device;
3934 struct p_sizes *p = pi->data;
3935 struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
3936 enum determine_dev_size dd = DS_UNCHANGED;
3937 sector_t p_size, p_usize, p_csize, my_usize;
3938 int ldsc = 0; /* local disk size changed */
3939 enum dds_flags ddsf;
3941 peer_device = conn_peer_device(connection, pi->vnr);
3942 if (!peer_device)
3943 return config_unknown_volume(connection, pi);
3944 device = peer_device->device;
3946 p_size = be64_to_cpu(p->d_size);
3947 p_usize = be64_to_cpu(p->u_size);
3948 p_csize = be64_to_cpu(p->c_size);
3950 /* just store the peer's disk size for now.
3951 * we still need to figure out whether we accept that. */
3952 device->p_size = p_size;
3954 if (get_ldev(device)) {
3955 sector_t new_size, cur_size;
3956 rcu_read_lock();
3957 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3958 rcu_read_unlock();
3960 warn_if_differ_considerably(device, "lower level device sizes",
3961 p_size, drbd_get_max_capacity(device->ldev));
3962 warn_if_differ_considerably(device, "user requested size",
3963 p_usize, my_usize);
3965 /* if this is the first connect, or an otherwise expected
3966 * param exchange, choose the minimum */
3967 if (device->state.conn == C_WF_REPORT_PARAMS)
3968 p_usize = min_not_zero(my_usize, p_usize);
3970 /* Never shrink a device with usable data during connect.
3971 But allow online shrinking if we are connected. */
3972 new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
3973 cur_size = drbd_get_capacity(device->this_bdev);
3974 if (new_size < cur_size &&
3975 device->state.disk >= D_OUTDATED &&
3976 device->state.conn < C_CONNECTED) {
3977 drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
3978 (unsigned long long)new_size, (unsigned long long)cur_size);
3979 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3980 put_ldev(device);
3981 return -EIO;
3984 if (my_usize != p_usize) {
3985 struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3987 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3988 if (!new_disk_conf) {
3989 drbd_err(device, "Allocation of new disk_conf failed\n");
3990 put_ldev(device);
3991 return -ENOMEM;
3994 mutex_lock(&connection->resource->conf_update);
3995 old_disk_conf = device->ldev->disk_conf;
3996 *new_disk_conf = *old_disk_conf;
3997 new_disk_conf->disk_size = p_usize;
3999 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4000 mutex_unlock(&connection->resource->conf_update);
4001 synchronize_rcu();
4002 kfree(old_disk_conf);
4004 drbd_info(device, "Peer sets u_size to %lu sectors\n",
4005 (unsigned long)my_usize);
4008 put_ldev(device);
4011 device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4012 /* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4013 In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4014 drbd_reconsider_queue_parameters(), we can be sure that after
4015 drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4017 ddsf = be16_to_cpu(p->dds_flags);
4018 if (get_ldev(device)) {
4019 drbd_reconsider_queue_parameters(device, device->ldev, o);
4020 dd = drbd_determine_dev_size(device, ddsf, NULL);
4021 put_ldev(device);
4022 if (dd == DS_ERROR)
4023 return -EIO;
4024 drbd_md_sync(device);
4025 } else {
4027 * I am diskless, need to accept the peer's *current* size.
4028 * I must NOT accept the peers backing disk size,
4029 * it may have been larger than mine all along...
4031 * At this point, the peer knows more about my disk, or at
4032 * least about what we last agreed upon, than myself.
4033 * So if his c_size is less than his d_size, the most likely
4034 * reason is that *my* d_size was smaller last time we checked.
4036 * However, if he sends a zero current size,
4037 * take his (user-capped or) backing disk size anyways.
4039 drbd_reconsider_queue_parameters(device, NULL, o);
4040 drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
4043 if (get_ldev(device)) {
4044 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4045 device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4046 ldsc = 1;
4049 put_ldev(device);
4052 if (device->state.conn > C_WF_REPORT_PARAMS) {
4053 if (be64_to_cpu(p->c_size) !=
4054 drbd_get_capacity(device->this_bdev) || ldsc) {
4055 /* we have different sizes, probably peer
4056 * needs to know my new size... */
4057 drbd_send_sizes(peer_device, 0, ddsf);
4059 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4060 (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4061 if (device->state.pdsk >= D_INCONSISTENT &&
4062 device->state.disk >= D_INCONSISTENT) {
4063 if (ddsf & DDSF_NO_RESYNC)
4064 drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4065 else
4066 resync_after_online_grow(device);
4067 } else
4068 set_bit(RESYNC_AFTER_NEG, &device->flags);
4072 return 0;
4075 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4077 struct drbd_peer_device *peer_device;
4078 struct drbd_device *device;
4079 struct p_uuids *p = pi->data;
4080 u64 *p_uuid;
4081 int i, updated_uuids = 0;
4083 peer_device = conn_peer_device(connection, pi->vnr);
4084 if (!peer_device)
4085 return config_unknown_volume(connection, pi);
4086 device = peer_device->device;
4088 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
4089 if (!p_uuid) {
4090 drbd_err(device, "kmalloc of p_uuid failed\n");
4091 return false;
4094 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4095 p_uuid[i] = be64_to_cpu(p->uuid[i]);
4097 kfree(device->p_uuid);
4098 device->p_uuid = p_uuid;
4100 if (device->state.conn < C_CONNECTED &&
4101 device->state.disk < D_INCONSISTENT &&
4102 device->state.role == R_PRIMARY &&
4103 (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4104 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4105 (unsigned long long)device->ed_uuid);
4106 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4107 return -EIO;
4110 if (get_ldev(device)) {
4111 int skip_initial_sync =
4112 device->state.conn == C_CONNECTED &&
4113 peer_device->connection->agreed_pro_version >= 90 &&
4114 device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4115 (p_uuid[UI_FLAGS] & 8);
4116 if (skip_initial_sync) {
4117 drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4118 drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4119 "clear_n_write from receive_uuids",
4120 BM_LOCKED_TEST_ALLOWED);
4121 _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4122 _drbd_uuid_set(device, UI_BITMAP, 0);
4123 _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4124 CS_VERBOSE, NULL);
4125 drbd_md_sync(device);
4126 updated_uuids = 1;
4128 put_ldev(device);
4129 } else if (device->state.disk < D_INCONSISTENT &&
4130 device->state.role == R_PRIMARY) {
4131 /* I am a diskless primary, the peer just created a new current UUID
4132 for me. */
4133 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4136 /* Before we test for the disk state, we should wait until an eventually
4137 ongoing cluster wide state change is finished. That is important if
4138 we are primary and are detaching from our disk. We need to see the
4139 new disk state... */
4140 mutex_lock(device->state_mutex);
4141 mutex_unlock(device->state_mutex);
4142 if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4143 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4145 if (updated_uuids)
4146 drbd_print_uuids(device, "receiver updated UUIDs to");
4148 return 0;
4152 * convert_state() - Converts the peer's view of the cluster state to our point of view
4153 * @ps: The state as seen by the peer.
4155 static union drbd_state convert_state(union drbd_state ps)
4157 union drbd_state ms;
4159 static enum drbd_conns c_tab[] = {
4160 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4161 [C_CONNECTED] = C_CONNECTED,
4163 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4164 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4165 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4166 [C_VERIFY_S] = C_VERIFY_T,
4167 [C_MASK] = C_MASK,
4170 ms.i = ps.i;
4172 ms.conn = c_tab[ps.conn];
4173 ms.peer = ps.role;
4174 ms.role = ps.peer;
4175 ms.pdsk = ps.disk;
4176 ms.disk = ps.pdsk;
4177 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4179 return ms;
4182 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4184 struct drbd_peer_device *peer_device;
4185 struct drbd_device *device;
4186 struct p_req_state *p = pi->data;
4187 union drbd_state mask, val;
4188 enum drbd_state_rv rv;
4190 peer_device = conn_peer_device(connection, pi->vnr);
4191 if (!peer_device)
4192 return -EIO;
4193 device = peer_device->device;
4195 mask.i = be32_to_cpu(p->mask);
4196 val.i = be32_to_cpu(p->val);
4198 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4199 mutex_is_locked(device->state_mutex)) {
4200 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4201 return 0;
4204 mask = convert_state(mask);
4205 val = convert_state(val);
4207 rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4208 drbd_send_sr_reply(peer_device, rv);
4210 drbd_md_sync(device);
4212 return 0;
4215 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4217 struct p_req_state *p = pi->data;
4218 union drbd_state mask, val;
4219 enum drbd_state_rv rv;
4221 mask.i = be32_to_cpu(p->mask);
4222 val.i = be32_to_cpu(p->val);
4224 if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4225 mutex_is_locked(&connection->cstate_mutex)) {
4226 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4227 return 0;
4230 mask = convert_state(mask);
4231 val = convert_state(val);
4233 rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4234 conn_send_sr_reply(connection, rv);
4236 return 0;
4239 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4241 struct drbd_peer_device *peer_device;
4242 struct drbd_device *device;
4243 struct p_state *p = pi->data;
4244 union drbd_state os, ns, peer_state;
4245 enum drbd_disk_state real_peer_disk;
4246 enum chg_state_flags cs_flags;
4247 int rv;
4249 peer_device = conn_peer_device(connection, pi->vnr);
4250 if (!peer_device)
4251 return config_unknown_volume(connection, pi);
4252 device = peer_device->device;
4254 peer_state.i = be32_to_cpu(p->state);
4256 real_peer_disk = peer_state.disk;
4257 if (peer_state.disk == D_NEGOTIATING) {
4258 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4259 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4262 spin_lock_irq(&device->resource->req_lock);
4263 retry:
4264 os = ns = drbd_read_state(device);
4265 spin_unlock_irq(&device->resource->req_lock);
4267 /* If some other part of the code (ack_receiver thread, timeout)
4268 * already decided to close the connection again,
4269 * we must not "re-establish" it here. */
4270 if (os.conn <= C_TEAR_DOWN)
4271 return -ECONNRESET;
4273 /* If this is the "end of sync" confirmation, usually the peer disk
4274 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4275 * set) resync started in PausedSyncT, or if the timing of pause-/
4276 * unpause-sync events has been "just right", the peer disk may
4277 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4279 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4280 real_peer_disk == D_UP_TO_DATE &&
4281 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4282 /* If we are (becoming) SyncSource, but peer is still in sync
4283 * preparation, ignore its uptodate-ness to avoid flapping, it
4284 * will change to inconsistent once the peer reaches active
4285 * syncing states.
4286 * It may have changed syncer-paused flags, however, so we
4287 * cannot ignore this completely. */
4288 if (peer_state.conn > C_CONNECTED &&
4289 peer_state.conn < C_SYNC_SOURCE)
4290 real_peer_disk = D_INCONSISTENT;
4292 /* if peer_state changes to connected at the same time,
4293 * it explicitly notifies us that it finished resync.
4294 * Maybe we should finish it up, too? */
4295 else if (os.conn >= C_SYNC_SOURCE &&
4296 peer_state.conn == C_CONNECTED) {
4297 if (drbd_bm_total_weight(device) <= device->rs_failed)
4298 drbd_resync_finished(device);
4299 return 0;
4303 /* explicit verify finished notification, stop sector reached. */
4304 if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4305 peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4306 ov_out_of_sync_print(device);
4307 drbd_resync_finished(device);
4308 return 0;
4311 /* peer says his disk is inconsistent, while we think it is uptodate,
4312 * and this happens while the peer still thinks we have a sync going on,
4313 * but we think we are already done with the sync.
4314 * We ignore this to avoid flapping pdsk.
4315 * This should not happen, if the peer is a recent version of drbd. */
4316 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4317 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4318 real_peer_disk = D_UP_TO_DATE;
4320 if (ns.conn == C_WF_REPORT_PARAMS)
4321 ns.conn = C_CONNECTED;
4323 if (peer_state.conn == C_AHEAD)
4324 ns.conn = C_BEHIND;
4326 if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4327 get_ldev_if_state(device, D_NEGOTIATING)) {
4328 int cr; /* consider resync */
4330 /* if we established a new connection */
4331 cr = (os.conn < C_CONNECTED);
4332 /* if we had an established connection
4333 * and one of the nodes newly attaches a disk */
4334 cr |= (os.conn == C_CONNECTED &&
4335 (peer_state.disk == D_NEGOTIATING ||
4336 os.disk == D_NEGOTIATING));
4337 /* if we have both been inconsistent, and the peer has been
4338 * forced to be UpToDate with --overwrite-data */
4339 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4340 /* if we had been plain connected, and the admin requested to
4341 * start a sync by "invalidate" or "invalidate-remote" */
4342 cr |= (os.conn == C_CONNECTED &&
4343 (peer_state.conn >= C_STARTING_SYNC_S &&
4344 peer_state.conn <= C_WF_BITMAP_T));
4346 if (cr)
4347 ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4349 put_ldev(device);
4350 if (ns.conn == C_MASK) {
4351 ns.conn = C_CONNECTED;
4352 if (device->state.disk == D_NEGOTIATING) {
4353 drbd_force_state(device, NS(disk, D_FAILED));
4354 } else if (peer_state.disk == D_NEGOTIATING) {
4355 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4356 peer_state.disk = D_DISKLESS;
4357 real_peer_disk = D_DISKLESS;
4358 } else {
4359 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4360 return -EIO;
4361 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4362 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4363 return -EIO;
4368 spin_lock_irq(&device->resource->req_lock);
4369 if (os.i != drbd_read_state(device).i)
4370 goto retry;
4371 clear_bit(CONSIDER_RESYNC, &device->flags);
4372 ns.peer = peer_state.role;
4373 ns.pdsk = real_peer_disk;
4374 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4375 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4376 ns.disk = device->new_state_tmp.disk;
4377 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4378 if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4379 test_bit(NEW_CUR_UUID, &device->flags)) {
4380 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4381 for temporal network outages! */
4382 spin_unlock_irq(&device->resource->req_lock);
4383 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4384 tl_clear(peer_device->connection);
4385 drbd_uuid_new_current(device);
4386 clear_bit(NEW_CUR_UUID, &device->flags);
4387 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4388 return -EIO;
4390 rv = _drbd_set_state(device, ns, cs_flags, NULL);
4391 ns = drbd_read_state(device);
4392 spin_unlock_irq(&device->resource->req_lock);
4394 if (rv < SS_SUCCESS) {
4395 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4396 return -EIO;
4399 if (os.conn > C_WF_REPORT_PARAMS) {
4400 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4401 peer_state.disk != D_NEGOTIATING ) {
4402 /* we want resync, peer has not yet decided to sync... */
4403 /* Nowadays only used when forcing a node into primary role and
4404 setting its disk to UpToDate with that */
4405 drbd_send_uuids(peer_device);
4406 drbd_send_current_state(peer_device);
4410 clear_bit(DISCARD_MY_DATA, &device->flags);
4412 drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4414 return 0;
4417 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4419 struct drbd_peer_device *peer_device;
4420 struct drbd_device *device;
4421 struct p_rs_uuid *p = pi->data;
4423 peer_device = conn_peer_device(connection, pi->vnr);
4424 if (!peer_device)
4425 return -EIO;
4426 device = peer_device->device;
4428 wait_event(device->misc_wait,
4429 device->state.conn == C_WF_SYNC_UUID ||
4430 device->state.conn == C_BEHIND ||
4431 device->state.conn < C_CONNECTED ||
4432 device->state.disk < D_NEGOTIATING);
4434 /* D_ASSERT(device, device->state.conn == C_WF_SYNC_UUID ); */
4436 /* Here the _drbd_uuid_ functions are right, current should
4437 _not_ be rotated into the history */
4438 if (get_ldev_if_state(device, D_NEGOTIATING)) {
4439 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4440 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4442 drbd_print_uuids(device, "updated sync uuid");
4443 drbd_start_resync(device, C_SYNC_TARGET);
4445 put_ldev(device);
4446 } else
4447 drbd_err(device, "Ignoring SyncUUID packet!\n");
4449 return 0;
4453 * receive_bitmap_plain
4455 * Return 0 when done, 1 when another iteration is needed, and a negative error
4456 * code upon failure.
4458 static int
4459 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4460 unsigned long *p, struct bm_xfer_ctx *c)
4462 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4463 drbd_header_size(peer_device->connection);
4464 unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4465 c->bm_words - c->word_offset);
4466 unsigned int want = num_words * sizeof(*p);
4467 int err;
4469 if (want != size) {
4470 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4471 return -EIO;
4473 if (want == 0)
4474 return 0;
4475 err = drbd_recv_all(peer_device->connection, p, want);
4476 if (err)
4477 return err;
4479 drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4481 c->word_offset += num_words;
4482 c->bit_offset = c->word_offset * BITS_PER_LONG;
4483 if (c->bit_offset > c->bm_bits)
4484 c->bit_offset = c->bm_bits;
4486 return 1;
4489 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4491 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4494 static int dcbp_get_start(struct p_compressed_bm *p)
4496 return (p->encoding & 0x80) != 0;
4499 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4501 return (p->encoding >> 4) & 0x7;
4505 * recv_bm_rle_bits
4507 * Return 0 when done, 1 when another iteration is needed, and a negative error
4508 * code upon failure.
4510 static int
4511 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4512 struct p_compressed_bm *p,
4513 struct bm_xfer_ctx *c,
4514 unsigned int len)
4516 struct bitstream bs;
4517 u64 look_ahead;
4518 u64 rl;
4519 u64 tmp;
4520 unsigned long s = c->bit_offset;
4521 unsigned long e;
4522 int toggle = dcbp_get_start(p);
4523 int have;
4524 int bits;
4526 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4528 bits = bitstream_get_bits(&bs, &look_ahead, 64);
4529 if (bits < 0)
4530 return -EIO;
4532 for (have = bits; have > 0; s += rl, toggle = !toggle) {
4533 bits = vli_decode_bits(&rl, look_ahead);
4534 if (bits <= 0)
4535 return -EIO;
4537 if (toggle) {
4538 e = s + rl -1;
4539 if (e >= c->bm_bits) {
4540 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4541 return -EIO;
4543 _drbd_bm_set_bits(peer_device->device, s, e);
4546 if (have < bits) {
4547 drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4548 have, bits, look_ahead,
4549 (unsigned int)(bs.cur.b - p->code),
4550 (unsigned int)bs.buf_len);
4551 return -EIO;
4553 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4554 if (likely(bits < 64))
4555 look_ahead >>= bits;
4556 else
4557 look_ahead = 0;
4558 have -= bits;
4560 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4561 if (bits < 0)
4562 return -EIO;
4563 look_ahead |= tmp << have;
4564 have += bits;
4567 c->bit_offset = s;
4568 bm_xfer_ctx_bit_to_word_offset(c);
4570 return (s != c->bm_bits);
4574 * decode_bitmap_c
4576 * Return 0 when done, 1 when another iteration is needed, and a negative error
4577 * code upon failure.
4579 static int
4580 decode_bitmap_c(struct drbd_peer_device *peer_device,
4581 struct p_compressed_bm *p,
4582 struct bm_xfer_ctx *c,
4583 unsigned int len)
4585 if (dcbp_get_code(p) == RLE_VLI_Bits)
4586 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4588 /* other variants had been implemented for evaluation,
4589 * but have been dropped as this one turned out to be "best"
4590 * during all our tests. */
4592 drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4593 conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4594 return -EIO;
4597 void INFO_bm_xfer_stats(struct drbd_device *device,
4598 const char *direction, struct bm_xfer_ctx *c)
4600 /* what would it take to transfer it "plaintext" */
4601 unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4602 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4603 unsigned int plain =
4604 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4605 c->bm_words * sizeof(unsigned long);
4606 unsigned int total = c->bytes[0] + c->bytes[1];
4607 unsigned int r;
4609 /* total can not be zero. but just in case: */
4610 if (total == 0)
4611 return;
4613 /* don't report if not compressed */
4614 if (total >= plain)
4615 return;
4617 /* total < plain. check for overflow, still */
4618 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4619 : (1000 * total / plain);
4621 if (r > 1000)
4622 r = 1000;
4624 r = 1000 - r;
4625 drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4626 "total %u; compression: %u.%u%%\n",
4627 direction,
4628 c->bytes[1], c->packets[1],
4629 c->bytes[0], c->packets[0],
4630 total, r/10, r % 10);
4633 /* Since we are processing the bitfield from lower addresses to higher,
4634 it does not matter if the process it in 32 bit chunks or 64 bit
4635 chunks as long as it is little endian. (Understand it as byte stream,
4636 beginning with the lowest byte...) If we would use big endian
4637 we would need to process it from the highest address to the lowest,
4638 in order to be agnostic to the 32 vs 64 bits issue.
4640 returns 0 on failure, 1 if we successfully received it. */
4641 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4643 struct drbd_peer_device *peer_device;
4644 struct drbd_device *device;
4645 struct bm_xfer_ctx c;
4646 int err;
4648 peer_device = conn_peer_device(connection, pi->vnr);
4649 if (!peer_device)
4650 return -EIO;
4651 device = peer_device->device;
4653 drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4654 /* you are supposed to send additional out-of-sync information
4655 * if you actually set bits during this phase */
4657 c = (struct bm_xfer_ctx) {
4658 .bm_bits = drbd_bm_bits(device),
4659 .bm_words = drbd_bm_words(device),
4662 for(;;) {
4663 if (pi->cmd == P_BITMAP)
4664 err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4665 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4666 /* MAYBE: sanity check that we speak proto >= 90,
4667 * and the feature is enabled! */
4668 struct p_compressed_bm *p = pi->data;
4670 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4671 drbd_err(device, "ReportCBitmap packet too large\n");
4672 err = -EIO;
4673 goto out;
4675 if (pi->size <= sizeof(*p)) {
4676 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4677 err = -EIO;
4678 goto out;
4680 err = drbd_recv_all(peer_device->connection, p, pi->size);
4681 if (err)
4682 goto out;
4683 err = decode_bitmap_c(peer_device, p, &c, pi->size);
4684 } else {
4685 drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4686 err = -EIO;
4687 goto out;
4690 c.packets[pi->cmd == P_BITMAP]++;
4691 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4693 if (err <= 0) {
4694 if (err < 0)
4695 goto out;
4696 break;
4698 err = drbd_recv_header(peer_device->connection, pi);
4699 if (err)
4700 goto out;
4703 INFO_bm_xfer_stats(device, "receive", &c);
4705 if (device->state.conn == C_WF_BITMAP_T) {
4706 enum drbd_state_rv rv;
4708 err = drbd_send_bitmap(device);
4709 if (err)
4710 goto out;
4711 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4712 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4713 D_ASSERT(device, rv == SS_SUCCESS);
4714 } else if (device->state.conn != C_WF_BITMAP_S) {
4715 /* admin may have requested C_DISCONNECTING,
4716 * other threads may have noticed network errors */
4717 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4718 drbd_conn_str(device->state.conn));
4720 err = 0;
4722 out:
4723 drbd_bm_unlock(device);
4724 if (!err && device->state.conn == C_WF_BITMAP_S)
4725 drbd_start_resync(device, C_SYNC_SOURCE);
4726 return err;
4729 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4731 drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4732 pi->cmd, pi->size);
4734 return ignore_remaining_packet(connection, pi);
4737 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4739 /* Make sure we've acked all the TCP data associated
4740 * with the data requests being unplugged */
4741 drbd_tcp_quickack(connection->data.socket);
4743 return 0;
4746 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4748 struct drbd_peer_device *peer_device;
4749 struct drbd_device *device;
4750 struct p_block_desc *p = pi->data;
4752 peer_device = conn_peer_device(connection, pi->vnr);
4753 if (!peer_device)
4754 return -EIO;
4755 device = peer_device->device;
4757 switch (device->state.conn) {
4758 case C_WF_SYNC_UUID:
4759 case C_WF_BITMAP_T:
4760 case C_BEHIND:
4761 break;
4762 default:
4763 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4764 drbd_conn_str(device->state.conn));
4767 drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4769 return 0;
4772 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4774 struct drbd_peer_device *peer_device;
4775 struct p_block_desc *p = pi->data;
4776 struct drbd_device *device;
4777 sector_t sector;
4778 int size, err = 0;
4780 peer_device = conn_peer_device(connection, pi->vnr);
4781 if (!peer_device)
4782 return -EIO;
4783 device = peer_device->device;
4785 sector = be64_to_cpu(p->sector);
4786 size = be32_to_cpu(p->blksize);
4788 dec_rs_pending(device);
4790 if (get_ldev(device)) {
4791 struct drbd_peer_request *peer_req;
4792 const int op = REQ_OP_WRITE_ZEROES;
4794 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
4795 size, 0, GFP_NOIO);
4796 if (!peer_req) {
4797 put_ldev(device);
4798 return -ENOMEM;
4801 peer_req->w.cb = e_end_resync_block;
4802 peer_req->submit_jif = jiffies;
4803 peer_req->flags |= EE_IS_TRIM;
4805 spin_lock_irq(&device->resource->req_lock);
4806 list_add_tail(&peer_req->w.list, &device->sync_ee);
4807 spin_unlock_irq(&device->resource->req_lock);
4809 atomic_add(pi->size >> 9, &device->rs_sect_ev);
4810 err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
4812 if (err) {
4813 spin_lock_irq(&device->resource->req_lock);
4814 list_del(&peer_req->w.list);
4815 spin_unlock_irq(&device->resource->req_lock);
4817 drbd_free_peer_req(device, peer_req);
4818 put_ldev(device);
4819 err = 0;
4820 goto fail;
4823 inc_unacked(device);
4825 /* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
4826 as well as drbd_rs_complete_io() */
4827 } else {
4828 fail:
4829 drbd_rs_complete_io(device, sector);
4830 drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
4833 atomic_add(size >> 9, &device->rs_sect_in);
4835 return err;
4838 struct data_cmd {
4839 int expect_payload;
4840 unsigned int pkt_size;
4841 int (*fn)(struct drbd_connection *, struct packet_info *);
4844 static struct data_cmd drbd_cmd_handler[] = {
4845 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
4846 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
4847 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4848 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4849 [P_BITMAP] = { 1, 0, receive_bitmap } ,
4850 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4851 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote },
4852 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4853 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4854 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam },
4855 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam },
4856 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
4857 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
4858 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
4859 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
4860 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
4861 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4862 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4863 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4864 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4865 [P_RS_THIN_REQ] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4866 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
4867 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4868 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4869 [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4870 [P_TRIM] = { 0, sizeof(struct p_trim), receive_Data },
4871 [P_RS_DEALLOCATED] = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
4872 [P_WSAME] = { 1, sizeof(struct p_wsame), receive_Data },
4875 static void drbdd(struct drbd_connection *connection)
4877 struct packet_info pi;
4878 size_t shs; /* sub header size */
4879 int err;
4881 while (get_t_state(&connection->receiver) == RUNNING) {
4882 struct data_cmd const *cmd;
4884 drbd_thread_current_set_cpu(&connection->receiver);
4885 update_receiver_timing_details(connection, drbd_recv_header);
4886 if (drbd_recv_header(connection, &pi))
4887 goto err_out;
4889 cmd = &drbd_cmd_handler[pi.cmd];
4890 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4891 drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4892 cmdname(pi.cmd), pi.cmd);
4893 goto err_out;
4896 shs = cmd->pkt_size;
4897 if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
4898 shs += sizeof(struct o_qlim);
4899 if (pi.size > shs && !cmd->expect_payload) {
4900 drbd_err(connection, "No payload expected %s l:%d\n",
4901 cmdname(pi.cmd), pi.size);
4902 goto err_out;
4904 if (pi.size < shs) {
4905 drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
4906 cmdname(pi.cmd), (int)shs, pi.size);
4907 goto err_out;
4910 if (shs) {
4911 update_receiver_timing_details(connection, drbd_recv_all_warn);
4912 err = drbd_recv_all_warn(connection, pi.data, shs);
4913 if (err)
4914 goto err_out;
4915 pi.size -= shs;
4918 update_receiver_timing_details(connection, cmd->fn);
4919 err = cmd->fn(connection, &pi);
4920 if (err) {
4921 drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4922 cmdname(pi.cmd), err, pi.size);
4923 goto err_out;
4926 return;
4928 err_out:
4929 conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4932 static void conn_disconnect(struct drbd_connection *connection)
4934 struct drbd_peer_device *peer_device;
4935 enum drbd_conns oc;
4936 int vnr;
4938 if (connection->cstate == C_STANDALONE)
4939 return;
4941 /* We are about to start the cleanup after connection loss.
4942 * Make sure drbd_make_request knows about that.
4943 * Usually we should be in some network failure state already,
4944 * but just in case we are not, we fix it up here.
4946 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4948 /* ack_receiver does not clean up anything. it must not interfere, either */
4949 drbd_thread_stop(&connection->ack_receiver);
4950 if (connection->ack_sender) {
4951 destroy_workqueue(connection->ack_sender);
4952 connection->ack_sender = NULL;
4954 drbd_free_sock(connection);
4956 rcu_read_lock();
4957 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4958 struct drbd_device *device = peer_device->device;
4959 kref_get(&device->kref);
4960 rcu_read_unlock();
4961 drbd_disconnected(peer_device);
4962 kref_put(&device->kref, drbd_destroy_device);
4963 rcu_read_lock();
4965 rcu_read_unlock();
4967 if (!list_empty(&connection->current_epoch->list))
4968 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4969 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4970 atomic_set(&connection->current_epoch->epoch_size, 0);
4971 connection->send.seen_any_write_yet = false;
4973 drbd_info(connection, "Connection closed\n");
4975 if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4976 conn_try_outdate_peer_async(connection);
4978 spin_lock_irq(&connection->resource->req_lock);
4979 oc = connection->cstate;
4980 if (oc >= C_UNCONNECTED)
4981 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4983 spin_unlock_irq(&connection->resource->req_lock);
4985 if (oc == C_DISCONNECTING)
4986 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4989 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4991 struct drbd_device *device = peer_device->device;
4992 unsigned int i;
4994 /* wait for current activity to cease. */
4995 spin_lock_irq(&device->resource->req_lock);
4996 _drbd_wait_ee_list_empty(device, &device->active_ee);
4997 _drbd_wait_ee_list_empty(device, &device->sync_ee);
4998 _drbd_wait_ee_list_empty(device, &device->read_ee);
4999 spin_unlock_irq(&device->resource->req_lock);
5001 /* We do not have data structures that would allow us to
5002 * get the rs_pending_cnt down to 0 again.
5003 * * On C_SYNC_TARGET we do not have any data structures describing
5004 * the pending RSDataRequest's we have sent.
5005 * * On C_SYNC_SOURCE there is no data structure that tracks
5006 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5007 * And no, it is not the sum of the reference counts in the
5008 * resync_LRU. The resync_LRU tracks the whole operation including
5009 * the disk-IO, while the rs_pending_cnt only tracks the blocks
5010 * on the fly. */
5011 drbd_rs_cancel_all(device);
5012 device->rs_total = 0;
5013 device->rs_failed = 0;
5014 atomic_set(&device->rs_pending_cnt, 0);
5015 wake_up(&device->misc_wait);
5017 del_timer_sync(&device->resync_timer);
5018 resync_timer_fn((unsigned long)device);
5020 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5021 * w_make_resync_request etc. which may still be on the worker queue
5022 * to be "canceled" */
5023 drbd_flush_workqueue(&peer_device->connection->sender_work);
5025 drbd_finish_peer_reqs(device);
5027 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5028 might have issued a work again. The one before drbd_finish_peer_reqs() is
5029 necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5030 drbd_flush_workqueue(&peer_device->connection->sender_work);
5032 /* need to do it again, drbd_finish_peer_reqs() may have populated it
5033 * again via drbd_try_clear_on_disk_bm(). */
5034 drbd_rs_cancel_all(device);
5036 kfree(device->p_uuid);
5037 device->p_uuid = NULL;
5039 if (!drbd_suspended(device))
5040 tl_clear(peer_device->connection);
5042 drbd_md_sync(device);
5044 if (get_ldev(device)) {
5045 drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5046 "write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5047 put_ldev(device);
5050 /* tcp_close and release of sendpage pages can be deferred. I don't
5051 * want to use SO_LINGER, because apparently it can be deferred for
5052 * more than 20 seconds (longest time I checked).
5054 * Actually we don't care for exactly when the network stack does its
5055 * put_page(), but release our reference on these pages right here.
5057 i = drbd_free_peer_reqs(device, &device->net_ee);
5058 if (i)
5059 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5060 i = atomic_read(&device->pp_in_use_by_net);
5061 if (i)
5062 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5063 i = atomic_read(&device->pp_in_use);
5064 if (i)
5065 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5067 D_ASSERT(device, list_empty(&device->read_ee));
5068 D_ASSERT(device, list_empty(&device->active_ee));
5069 D_ASSERT(device, list_empty(&device->sync_ee));
5070 D_ASSERT(device, list_empty(&device->done_ee));
5072 return 0;
5076 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5077 * we can agree on is stored in agreed_pro_version.
5079 * feature flags and the reserved array should be enough room for future
5080 * enhancements of the handshake protocol, and possible plugins...
5082 * for now, they are expected to be zero, but ignored.
5084 static int drbd_send_features(struct drbd_connection *connection)
5086 struct drbd_socket *sock;
5087 struct p_connection_features *p;
5089 sock = &connection->data;
5090 p = conn_prepare_command(connection, sock);
5091 if (!p)
5092 return -EIO;
5093 memset(p, 0, sizeof(*p));
5094 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5095 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5096 p->feature_flags = cpu_to_be32(PRO_FEATURES);
5097 return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5101 * return values:
5102 * 1 yes, we have a valid connection
5103 * 0 oops, did not work out, please try again
5104 * -1 peer talks different language,
5105 * no point in trying again, please go standalone.
5107 static int drbd_do_features(struct drbd_connection *connection)
5109 /* ASSERT current == connection->receiver ... */
5110 struct p_connection_features *p;
5111 const int expect = sizeof(struct p_connection_features);
5112 struct packet_info pi;
5113 int err;
5115 err = drbd_send_features(connection);
5116 if (err)
5117 return 0;
5119 err = drbd_recv_header(connection, &pi);
5120 if (err)
5121 return 0;
5123 if (pi.cmd != P_CONNECTION_FEATURES) {
5124 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5125 cmdname(pi.cmd), pi.cmd);
5126 return -1;
5129 if (pi.size != expect) {
5130 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5131 expect, pi.size);
5132 return -1;
5135 p = pi.data;
5136 err = drbd_recv_all_warn(connection, p, expect);
5137 if (err)
5138 return 0;
5140 p->protocol_min = be32_to_cpu(p->protocol_min);
5141 p->protocol_max = be32_to_cpu(p->protocol_max);
5142 if (p->protocol_max == 0)
5143 p->protocol_max = p->protocol_min;
5145 if (PRO_VERSION_MAX < p->protocol_min ||
5146 PRO_VERSION_MIN > p->protocol_max)
5147 goto incompat;
5149 connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5150 connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5152 drbd_info(connection, "Handshake successful: "
5153 "Agreed network protocol version %d\n", connection->agreed_pro_version);
5155 drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s.\n",
5156 connection->agreed_features,
5157 connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5158 connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5159 connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" :
5160 connection->agreed_features ? "" : " none");
5162 return 1;
5164 incompat:
5165 drbd_err(connection, "incompatible DRBD dialects: "
5166 "I support %d-%d, peer supports %d-%d\n",
5167 PRO_VERSION_MIN, PRO_VERSION_MAX,
5168 p->protocol_min, p->protocol_max);
5169 return -1;
5172 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5173 static int drbd_do_auth(struct drbd_connection *connection)
5175 drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5176 drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5177 return -1;
5179 #else
5180 #define CHALLENGE_LEN 64
5182 /* Return value:
5183 1 - auth succeeded,
5184 0 - failed, try again (network error),
5185 -1 - auth failed, don't try again.
5188 static int drbd_do_auth(struct drbd_connection *connection)
5190 struct drbd_socket *sock;
5191 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
5192 char *response = NULL;
5193 char *right_response = NULL;
5194 char *peers_ch = NULL;
5195 unsigned int key_len;
5196 char secret[SHARED_SECRET_MAX]; /* 64 byte */
5197 unsigned int resp_size;
5198 SHASH_DESC_ON_STACK(desc, connection->cram_hmac_tfm);
5199 struct packet_info pi;
5200 struct net_conf *nc;
5201 int err, rv;
5203 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
5205 rcu_read_lock();
5206 nc = rcu_dereference(connection->net_conf);
5207 key_len = strlen(nc->shared_secret);
5208 memcpy(secret, nc->shared_secret, key_len);
5209 rcu_read_unlock();
5211 desc->tfm = connection->cram_hmac_tfm;
5212 desc->flags = 0;
5214 rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5215 if (rv) {
5216 drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5217 rv = -1;
5218 goto fail;
5221 get_random_bytes(my_challenge, CHALLENGE_LEN);
5223 sock = &connection->data;
5224 if (!conn_prepare_command(connection, sock)) {
5225 rv = 0;
5226 goto fail;
5228 rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5229 my_challenge, CHALLENGE_LEN);
5230 if (!rv)
5231 goto fail;
5233 err = drbd_recv_header(connection, &pi);
5234 if (err) {
5235 rv = 0;
5236 goto fail;
5239 if (pi.cmd != P_AUTH_CHALLENGE) {
5240 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5241 cmdname(pi.cmd), pi.cmd);
5242 rv = 0;
5243 goto fail;
5246 if (pi.size > CHALLENGE_LEN * 2) {
5247 drbd_err(connection, "expected AuthChallenge payload too big.\n");
5248 rv = -1;
5249 goto fail;
5252 if (pi.size < CHALLENGE_LEN) {
5253 drbd_err(connection, "AuthChallenge payload too small.\n");
5254 rv = -1;
5255 goto fail;
5258 peers_ch = kmalloc(pi.size, GFP_NOIO);
5259 if (peers_ch == NULL) {
5260 drbd_err(connection, "kmalloc of peers_ch failed\n");
5261 rv = -1;
5262 goto fail;
5265 err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5266 if (err) {
5267 rv = 0;
5268 goto fail;
5271 if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5272 drbd_err(connection, "Peer presented the same challenge!\n");
5273 rv = -1;
5274 goto fail;
5277 resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5278 response = kmalloc(resp_size, GFP_NOIO);
5279 if (response == NULL) {
5280 drbd_err(connection, "kmalloc of response failed\n");
5281 rv = -1;
5282 goto fail;
5285 rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5286 if (rv) {
5287 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5288 rv = -1;
5289 goto fail;
5292 if (!conn_prepare_command(connection, sock)) {
5293 rv = 0;
5294 goto fail;
5296 rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5297 response, resp_size);
5298 if (!rv)
5299 goto fail;
5301 err = drbd_recv_header(connection, &pi);
5302 if (err) {
5303 rv = 0;
5304 goto fail;
5307 if (pi.cmd != P_AUTH_RESPONSE) {
5308 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5309 cmdname(pi.cmd), pi.cmd);
5310 rv = 0;
5311 goto fail;
5314 if (pi.size != resp_size) {
5315 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5316 rv = 0;
5317 goto fail;
5320 err = drbd_recv_all_warn(connection, response , resp_size);
5321 if (err) {
5322 rv = 0;
5323 goto fail;
5326 right_response = kmalloc(resp_size, GFP_NOIO);
5327 if (right_response == NULL) {
5328 drbd_err(connection, "kmalloc of right_response failed\n");
5329 rv = -1;
5330 goto fail;
5333 rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5334 right_response);
5335 if (rv) {
5336 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5337 rv = -1;
5338 goto fail;
5341 rv = !memcmp(response, right_response, resp_size);
5343 if (rv)
5344 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5345 resp_size);
5346 else
5347 rv = -1;
5349 fail:
5350 kfree(peers_ch);
5351 kfree(response);
5352 kfree(right_response);
5353 shash_desc_zero(desc);
5355 return rv;
5357 #endif
5359 int drbd_receiver(struct drbd_thread *thi)
5361 struct drbd_connection *connection = thi->connection;
5362 int h;
5364 drbd_info(connection, "receiver (re)started\n");
5366 do {
5367 h = conn_connect(connection);
5368 if (h == 0) {
5369 conn_disconnect(connection);
5370 schedule_timeout_interruptible(HZ);
5372 if (h == -1) {
5373 drbd_warn(connection, "Discarding network configuration.\n");
5374 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5376 } while (h == 0);
5378 if (h > 0)
5379 drbdd(connection);
5381 conn_disconnect(connection);
5383 drbd_info(connection, "receiver terminated\n");
5384 return 0;
5387 /* ********* acknowledge sender ******** */
5389 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5391 struct p_req_state_reply *p = pi->data;
5392 int retcode = be32_to_cpu(p->retcode);
5394 if (retcode >= SS_SUCCESS) {
5395 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5396 } else {
5397 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5398 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5399 drbd_set_st_err_str(retcode), retcode);
5401 wake_up(&connection->ping_wait);
5403 return 0;
5406 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5408 struct drbd_peer_device *peer_device;
5409 struct drbd_device *device;
5410 struct p_req_state_reply *p = pi->data;
5411 int retcode = be32_to_cpu(p->retcode);
5413 peer_device = conn_peer_device(connection, pi->vnr);
5414 if (!peer_device)
5415 return -EIO;
5416 device = peer_device->device;
5418 if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5419 D_ASSERT(device, connection->agreed_pro_version < 100);
5420 return got_conn_RqSReply(connection, pi);
5423 if (retcode >= SS_SUCCESS) {
5424 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5425 } else {
5426 set_bit(CL_ST_CHG_FAIL, &device->flags);
5427 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5428 drbd_set_st_err_str(retcode), retcode);
5430 wake_up(&device->state_wait);
5432 return 0;
5435 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5437 return drbd_send_ping_ack(connection);
5441 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5443 /* restore idle timeout */
5444 connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5445 if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5446 wake_up(&connection->ping_wait);
5448 return 0;
5451 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5453 struct drbd_peer_device *peer_device;
5454 struct drbd_device *device;
5455 struct p_block_ack *p = pi->data;
5456 sector_t sector = be64_to_cpu(p->sector);
5457 int blksize = be32_to_cpu(p->blksize);
5459 peer_device = conn_peer_device(connection, pi->vnr);
5460 if (!peer_device)
5461 return -EIO;
5462 device = peer_device->device;
5464 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5466 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5468 if (get_ldev(device)) {
5469 drbd_rs_complete_io(device, sector);
5470 drbd_set_in_sync(device, sector, blksize);
5471 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5472 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5473 put_ldev(device);
5475 dec_rs_pending(device);
5476 atomic_add(blksize >> 9, &device->rs_sect_in);
5478 return 0;
5481 static int
5482 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5483 struct rb_root *root, const char *func,
5484 enum drbd_req_event what, bool missing_ok)
5486 struct drbd_request *req;
5487 struct bio_and_error m;
5489 spin_lock_irq(&device->resource->req_lock);
5490 req = find_request(device, root, id, sector, missing_ok, func);
5491 if (unlikely(!req)) {
5492 spin_unlock_irq(&device->resource->req_lock);
5493 return -EIO;
5495 __req_mod(req, what, &m);
5496 spin_unlock_irq(&device->resource->req_lock);
5498 if (m.bio)
5499 complete_master_bio(device, &m);
5500 return 0;
5503 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5505 struct drbd_peer_device *peer_device;
5506 struct drbd_device *device;
5507 struct p_block_ack *p = pi->data;
5508 sector_t sector = be64_to_cpu(p->sector);
5509 int blksize = be32_to_cpu(p->blksize);
5510 enum drbd_req_event what;
5512 peer_device = conn_peer_device(connection, pi->vnr);
5513 if (!peer_device)
5514 return -EIO;
5515 device = peer_device->device;
5517 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5519 if (p->block_id == ID_SYNCER) {
5520 drbd_set_in_sync(device, sector, blksize);
5521 dec_rs_pending(device);
5522 return 0;
5524 switch (pi->cmd) {
5525 case P_RS_WRITE_ACK:
5526 what = WRITE_ACKED_BY_PEER_AND_SIS;
5527 break;
5528 case P_WRITE_ACK:
5529 what = WRITE_ACKED_BY_PEER;
5530 break;
5531 case P_RECV_ACK:
5532 what = RECV_ACKED_BY_PEER;
5533 break;
5534 case P_SUPERSEDED:
5535 what = CONFLICT_RESOLVED;
5536 break;
5537 case P_RETRY_WRITE:
5538 what = POSTPONE_WRITE;
5539 break;
5540 default:
5541 BUG();
5544 return validate_req_change_req_state(device, p->block_id, sector,
5545 &device->write_requests, __func__,
5546 what, false);
5549 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5551 struct drbd_peer_device *peer_device;
5552 struct drbd_device *device;
5553 struct p_block_ack *p = pi->data;
5554 sector_t sector = be64_to_cpu(p->sector);
5555 int size = be32_to_cpu(p->blksize);
5556 int err;
5558 peer_device = conn_peer_device(connection, pi->vnr);
5559 if (!peer_device)
5560 return -EIO;
5561 device = peer_device->device;
5563 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5565 if (p->block_id == ID_SYNCER) {
5566 dec_rs_pending(device);
5567 drbd_rs_failed_io(device, sector, size);
5568 return 0;
5571 err = validate_req_change_req_state(device, p->block_id, sector,
5572 &device->write_requests, __func__,
5573 NEG_ACKED, true);
5574 if (err) {
5575 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5576 The master bio might already be completed, therefore the
5577 request is no longer in the collision hash. */
5578 /* In Protocol B we might already have got a P_RECV_ACK
5579 but then get a P_NEG_ACK afterwards. */
5580 drbd_set_out_of_sync(device, sector, size);
5582 return 0;
5585 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5587 struct drbd_peer_device *peer_device;
5588 struct drbd_device *device;
5589 struct p_block_ack *p = pi->data;
5590 sector_t sector = be64_to_cpu(p->sector);
5592 peer_device = conn_peer_device(connection, pi->vnr);
5593 if (!peer_device)
5594 return -EIO;
5595 device = peer_device->device;
5597 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5599 drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5600 (unsigned long long)sector, be32_to_cpu(p->blksize));
5602 return validate_req_change_req_state(device, p->block_id, sector,
5603 &device->read_requests, __func__,
5604 NEG_ACKED, false);
5607 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5609 struct drbd_peer_device *peer_device;
5610 struct drbd_device *device;
5611 sector_t sector;
5612 int size;
5613 struct p_block_ack *p = pi->data;
5615 peer_device = conn_peer_device(connection, pi->vnr);
5616 if (!peer_device)
5617 return -EIO;
5618 device = peer_device->device;
5620 sector = be64_to_cpu(p->sector);
5621 size = be32_to_cpu(p->blksize);
5623 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5625 dec_rs_pending(device);
5627 if (get_ldev_if_state(device, D_FAILED)) {
5628 drbd_rs_complete_io(device, sector);
5629 switch (pi->cmd) {
5630 case P_NEG_RS_DREPLY:
5631 drbd_rs_failed_io(device, sector, size);
5632 case P_RS_CANCEL:
5633 break;
5634 default:
5635 BUG();
5637 put_ldev(device);
5640 return 0;
5643 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5645 struct p_barrier_ack *p = pi->data;
5646 struct drbd_peer_device *peer_device;
5647 int vnr;
5649 tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5651 rcu_read_lock();
5652 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5653 struct drbd_device *device = peer_device->device;
5655 if (device->state.conn == C_AHEAD &&
5656 atomic_read(&device->ap_in_flight) == 0 &&
5657 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5658 device->start_resync_timer.expires = jiffies + HZ;
5659 add_timer(&device->start_resync_timer);
5662 rcu_read_unlock();
5664 return 0;
5667 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5669 struct drbd_peer_device *peer_device;
5670 struct drbd_device *device;
5671 struct p_block_ack *p = pi->data;
5672 struct drbd_device_work *dw;
5673 sector_t sector;
5674 int size;
5676 peer_device = conn_peer_device(connection, pi->vnr);
5677 if (!peer_device)
5678 return -EIO;
5679 device = peer_device->device;
5681 sector = be64_to_cpu(p->sector);
5682 size = be32_to_cpu(p->blksize);
5684 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5686 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5687 drbd_ov_out_of_sync_found(device, sector, size);
5688 else
5689 ov_out_of_sync_print(device);
5691 if (!get_ldev(device))
5692 return 0;
5694 drbd_rs_complete_io(device, sector);
5695 dec_rs_pending(device);
5697 --device->ov_left;
5699 /* let's advance progress step marks only for every other megabyte */
5700 if ((device->ov_left & 0x200) == 0x200)
5701 drbd_advance_rs_marks(device, device->ov_left);
5703 if (device->ov_left == 0) {
5704 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5705 if (dw) {
5706 dw->w.cb = w_ov_finished;
5707 dw->device = device;
5708 drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5709 } else {
5710 drbd_err(device, "kmalloc(dw) failed.");
5711 ov_out_of_sync_print(device);
5712 drbd_resync_finished(device);
5715 put_ldev(device);
5716 return 0;
5719 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5721 return 0;
5724 struct meta_sock_cmd {
5725 size_t pkt_size;
5726 int (*fn)(struct drbd_connection *connection, struct packet_info *);
5729 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5731 long t;
5732 struct net_conf *nc;
5734 rcu_read_lock();
5735 nc = rcu_dereference(connection->net_conf);
5736 t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5737 rcu_read_unlock();
5739 t *= HZ;
5740 if (ping_timeout)
5741 t /= 10;
5743 connection->meta.socket->sk->sk_rcvtimeo = t;
5746 static void set_ping_timeout(struct drbd_connection *connection)
5748 set_rcvtimeo(connection, 1);
5751 static void set_idle_timeout(struct drbd_connection *connection)
5753 set_rcvtimeo(connection, 0);
5756 static struct meta_sock_cmd ack_receiver_tbl[] = {
5757 [P_PING] = { 0, got_Ping },
5758 [P_PING_ACK] = { 0, got_PingAck },
5759 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5760 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5761 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5762 [P_SUPERSEDED] = { sizeof(struct p_block_ack), got_BlockAck },
5763 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
5764 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
5765 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
5766 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
5767 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
5768 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5769 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
5770 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
5771 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
5772 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5773 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
5776 int drbd_ack_receiver(struct drbd_thread *thi)
5778 struct drbd_connection *connection = thi->connection;
5779 struct meta_sock_cmd *cmd = NULL;
5780 struct packet_info pi;
5781 unsigned long pre_recv_jif;
5782 int rv;
5783 void *buf = connection->meta.rbuf;
5784 int received = 0;
5785 unsigned int header_size = drbd_header_size(connection);
5786 int expect = header_size;
5787 bool ping_timeout_active = false;
5788 struct sched_param param = { .sched_priority = 2 };
5790 rv = sched_setscheduler(current, SCHED_RR, &param);
5791 if (rv < 0)
5792 drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
5794 while (get_t_state(thi) == RUNNING) {
5795 drbd_thread_current_set_cpu(thi);
5797 conn_reclaim_net_peer_reqs(connection);
5799 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5800 if (drbd_send_ping(connection)) {
5801 drbd_err(connection, "drbd_send_ping has failed\n");
5802 goto reconnect;
5804 set_ping_timeout(connection);
5805 ping_timeout_active = true;
5808 pre_recv_jif = jiffies;
5809 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5811 /* Note:
5812 * -EINTR (on meta) we got a signal
5813 * -EAGAIN (on meta) rcvtimeo expired
5814 * -ECONNRESET other side closed the connection
5815 * -ERESTARTSYS (on data) we got a signal
5816 * rv < 0 other than above: unexpected error!
5817 * rv == expected: full header or command
5818 * rv < expected: "woken" by signal during receive
5819 * rv == 0 : "connection shut down by peer"
5821 if (likely(rv > 0)) {
5822 received += rv;
5823 buf += rv;
5824 } else if (rv == 0) {
5825 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5826 long t;
5827 rcu_read_lock();
5828 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5829 rcu_read_unlock();
5831 t = wait_event_timeout(connection->ping_wait,
5832 connection->cstate < C_WF_REPORT_PARAMS,
5834 if (t)
5835 break;
5837 drbd_err(connection, "meta connection shut down by peer.\n");
5838 goto reconnect;
5839 } else if (rv == -EAGAIN) {
5840 /* If the data socket received something meanwhile,
5841 * that is good enough: peer is still alive. */
5842 if (time_after(connection->last_received, pre_recv_jif))
5843 continue;
5844 if (ping_timeout_active) {
5845 drbd_err(connection, "PingAck did not arrive in time.\n");
5846 goto reconnect;
5848 set_bit(SEND_PING, &connection->flags);
5849 continue;
5850 } else if (rv == -EINTR) {
5851 /* maybe drbd_thread_stop(): the while condition will notice.
5852 * maybe woken for send_ping: we'll send a ping above,
5853 * and change the rcvtimeo */
5854 flush_signals(current);
5855 continue;
5856 } else {
5857 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5858 goto reconnect;
5861 if (received == expect && cmd == NULL) {
5862 if (decode_header(connection, connection->meta.rbuf, &pi))
5863 goto reconnect;
5864 cmd = &ack_receiver_tbl[pi.cmd];
5865 if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5866 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5867 cmdname(pi.cmd), pi.cmd);
5868 goto disconnect;
5870 expect = header_size + cmd->pkt_size;
5871 if (pi.size != expect - header_size) {
5872 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5873 pi.cmd, pi.size);
5874 goto reconnect;
5877 if (received == expect) {
5878 bool err;
5880 err = cmd->fn(connection, &pi);
5881 if (err) {
5882 drbd_err(connection, "%pf failed\n", cmd->fn);
5883 goto reconnect;
5886 connection->last_received = jiffies;
5888 if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5889 set_idle_timeout(connection);
5890 ping_timeout_active = false;
5893 buf = connection->meta.rbuf;
5894 received = 0;
5895 expect = header_size;
5896 cmd = NULL;
5900 if (0) {
5901 reconnect:
5902 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5903 conn_md_sync(connection);
5905 if (0) {
5906 disconnect:
5907 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5910 drbd_info(connection, "ack_receiver terminated\n");
5912 return 0;
5915 void drbd_send_acks_wf(struct work_struct *ws)
5917 struct drbd_peer_device *peer_device =
5918 container_of(ws, struct drbd_peer_device, send_acks_work);
5919 struct drbd_connection *connection = peer_device->connection;
5920 struct drbd_device *device = peer_device->device;
5921 struct net_conf *nc;
5922 int tcp_cork, err;
5924 rcu_read_lock();
5925 nc = rcu_dereference(connection->net_conf);
5926 tcp_cork = nc->tcp_cork;
5927 rcu_read_unlock();
5929 if (tcp_cork)
5930 drbd_tcp_cork(connection->meta.socket);
5932 err = drbd_finish_peer_reqs(device);
5933 kref_put(&device->kref, drbd_destroy_device);
5934 /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
5935 struct work_struct send_acks_work alive, which is in the peer_device object */
5937 if (err) {
5938 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5939 return;
5942 if (tcp_cork)
5943 drbd_tcp_uncork(connection->meta.socket);
5945 return;