Merge branch 'fixes' of git://git.kernel.org/pub/scm/linux/kernel/git/evalenti/linux...
[linux/fpc-iii.git] / drivers / block / drbd / drbd_receiver.c
blob050aaa1c03504e7bb1f90be3628997385b8fe4cc
1 /*
2 drbd_receiver.c
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_protocol.h"
48 #include "drbd_req.h"
49 #include "drbd_vli.h"
51 #define PRO_FEATURES (FF_TRIM)
53 struct packet_info {
54 enum drbd_packet cmd;
55 unsigned int size;
56 unsigned int vnr;
57 void *data;
60 enum finish_epoch {
61 FE_STILL_LIVE,
62 FE_DESTROYED,
63 FE_RECYCLED,
66 static int drbd_do_features(struct drbd_connection *connection);
67 static int drbd_do_auth(struct drbd_connection *connection);
68 static int drbd_disconnected(struct drbd_peer_device *);
69 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
71 static int e_end_block(struct drbd_work *, int);
74 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
77 * some helper functions to deal with single linked page lists,
78 * page->private being our "next" pointer.
81 /* If at least n pages are linked at head, get n pages off.
82 * Otherwise, don't modify head, and return NULL.
83 * Locking is the responsibility of the caller.
85 static struct page *page_chain_del(struct page **head, int n)
87 struct page *page;
88 struct page *tmp;
90 BUG_ON(!n);
91 BUG_ON(!head);
93 page = *head;
95 if (!page)
96 return NULL;
98 while (page) {
99 tmp = page_chain_next(page);
100 if (--n == 0)
101 break; /* found sufficient pages */
102 if (tmp == NULL)
103 /* insufficient pages, don't use any of them. */
104 return NULL;
105 page = tmp;
108 /* add end of list marker for the returned list */
109 set_page_private(page, 0);
110 /* actual return value, and adjustment of head */
111 page = *head;
112 *head = tmp;
113 return page;
116 /* may be used outside of locks to find the tail of a (usually short)
117 * "private" page chain, before adding it back to a global chain head
118 * with page_chain_add() under a spinlock. */
119 static struct page *page_chain_tail(struct page *page, int *len)
121 struct page *tmp;
122 int i = 1;
123 while ((tmp = page_chain_next(page)))
124 ++i, page = tmp;
125 if (len)
126 *len = i;
127 return page;
130 static int page_chain_free(struct page *page)
132 struct page *tmp;
133 int i = 0;
134 page_chain_for_each_safe(page, tmp) {
135 put_page(page);
136 ++i;
138 return i;
141 static void page_chain_add(struct page **head,
142 struct page *chain_first, struct page *chain_last)
144 #if 1
145 struct page *tmp;
146 tmp = page_chain_tail(chain_first, NULL);
147 BUG_ON(tmp != chain_last);
148 #endif
150 /* add chain to head */
151 set_page_private(chain_last, (unsigned long)*head);
152 *head = chain_first;
155 static struct page *__drbd_alloc_pages(struct drbd_device *device,
156 unsigned int number)
158 struct page *page = NULL;
159 struct page *tmp = NULL;
160 unsigned int i = 0;
162 /* Yes, testing drbd_pp_vacant outside the lock is racy.
163 * So what. It saves a spin_lock. */
164 if (drbd_pp_vacant >= number) {
165 spin_lock(&drbd_pp_lock);
166 page = page_chain_del(&drbd_pp_pool, number);
167 if (page)
168 drbd_pp_vacant -= number;
169 spin_unlock(&drbd_pp_lock);
170 if (page)
171 return page;
174 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
175 * "criss-cross" setup, that might cause write-out on some other DRBD,
176 * which in turn might block on the other node at this very place. */
177 for (i = 0; i < number; i++) {
178 tmp = alloc_page(GFP_TRY);
179 if (!tmp)
180 break;
181 set_page_private(tmp, (unsigned long)page);
182 page = tmp;
185 if (i == number)
186 return page;
188 /* Not enough pages immediately available this time.
189 * No need to jump around here, drbd_alloc_pages will retry this
190 * function "soon". */
191 if (page) {
192 tmp = page_chain_tail(page, NULL);
193 spin_lock(&drbd_pp_lock);
194 page_chain_add(&drbd_pp_pool, page, tmp);
195 drbd_pp_vacant += i;
196 spin_unlock(&drbd_pp_lock);
198 return NULL;
201 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
202 struct list_head *to_be_freed)
204 struct drbd_peer_request *peer_req, *tmp;
206 /* The EEs are always appended to the end of the list. Since
207 they are sent in order over the wire, they have to finish
208 in order. As soon as we see the first not finished we can
209 stop to examine the list... */
211 list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
212 if (drbd_peer_req_has_active_page(peer_req))
213 break;
214 list_move(&peer_req->w.list, to_be_freed);
218 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
220 LIST_HEAD(reclaimed);
221 struct drbd_peer_request *peer_req, *t;
223 spin_lock_irq(&device->resource->req_lock);
224 reclaim_finished_net_peer_reqs(device, &reclaimed);
225 spin_unlock_irq(&device->resource->req_lock);
226 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
227 drbd_free_net_peer_req(device, peer_req);
230 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
232 struct drbd_peer_device *peer_device;
233 int vnr;
235 rcu_read_lock();
236 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
237 struct drbd_device *device = peer_device->device;
238 if (!atomic_read(&device->pp_in_use_by_net))
239 continue;
241 kref_get(&device->kref);
242 rcu_read_unlock();
243 drbd_reclaim_net_peer_reqs(device);
244 kref_put(&device->kref, drbd_destroy_device);
245 rcu_read_lock();
247 rcu_read_unlock();
251 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
252 * @device: DRBD device.
253 * @number: number of pages requested
254 * @retry: whether to retry, if not enough pages are available right now
256 * Tries to allocate number pages, first from our own page pool, then from
257 * the kernel.
258 * Possibly retry until DRBD frees sufficient pages somewhere else.
260 * If this allocation would exceed the max_buffers setting, we throttle
261 * allocation (schedule_timeout) to give the system some room to breathe.
263 * We do not use max-buffers as hard limit, because it could lead to
264 * congestion and further to a distributed deadlock during online-verify or
265 * (checksum based) resync, if the max-buffers, socket buffer sizes and
266 * resync-rate settings are mis-configured.
268 * Returns a page chain linked via page->private.
270 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
271 bool retry)
273 struct drbd_device *device = peer_device->device;
274 struct page *page = NULL;
275 struct net_conf *nc;
276 DEFINE_WAIT(wait);
277 unsigned int mxb;
279 rcu_read_lock();
280 nc = rcu_dereference(peer_device->connection->net_conf);
281 mxb = nc ? nc->max_buffers : 1000000;
282 rcu_read_unlock();
284 if (atomic_read(&device->pp_in_use) < mxb)
285 page = __drbd_alloc_pages(device, number);
287 /* Try to keep the fast path fast, but occasionally we need
288 * to reclaim the pages we lended to the network stack. */
289 if (page && atomic_read(&device->pp_in_use_by_net) > 512)
290 drbd_reclaim_net_peer_reqs(device);
292 while (page == NULL) {
293 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
295 drbd_reclaim_net_peer_reqs(device);
297 if (atomic_read(&device->pp_in_use) < mxb) {
298 page = __drbd_alloc_pages(device, number);
299 if (page)
300 break;
303 if (!retry)
304 break;
306 if (signal_pending(current)) {
307 drbd_warn(device, "drbd_alloc_pages interrupted!\n");
308 break;
311 if (schedule_timeout(HZ/10) == 0)
312 mxb = UINT_MAX;
314 finish_wait(&drbd_pp_wait, &wait);
316 if (page)
317 atomic_add(number, &device->pp_in_use);
318 return page;
321 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
322 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
323 * Either links the page chain back to the global pool,
324 * or returns all pages to the system. */
325 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
327 atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
328 int i;
330 if (page == NULL)
331 return;
333 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
334 i = page_chain_free(page);
335 else {
336 struct page *tmp;
337 tmp = page_chain_tail(page, &i);
338 spin_lock(&drbd_pp_lock);
339 page_chain_add(&drbd_pp_pool, page, tmp);
340 drbd_pp_vacant += i;
341 spin_unlock(&drbd_pp_lock);
343 i = atomic_sub_return(i, a);
344 if (i < 0)
345 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
346 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
347 wake_up(&drbd_pp_wait);
351 You need to hold the req_lock:
352 _drbd_wait_ee_list_empty()
354 You must not have the req_lock:
355 drbd_free_peer_req()
356 drbd_alloc_peer_req()
357 drbd_free_peer_reqs()
358 drbd_ee_fix_bhs()
359 drbd_finish_peer_reqs()
360 drbd_clear_done_ee()
361 drbd_wait_ee_list_empty()
364 struct drbd_peer_request *
365 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
366 unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local)
368 struct drbd_device *device = peer_device->device;
369 struct drbd_peer_request *peer_req;
370 struct page *page = NULL;
371 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
373 if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
374 return NULL;
376 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
377 if (!peer_req) {
378 if (!(gfp_mask & __GFP_NOWARN))
379 drbd_err(device, "%s: allocation failed\n", __func__);
380 return NULL;
383 if (has_payload && data_size) {
384 page = drbd_alloc_pages(peer_device, nr_pages,
385 gfpflags_allow_blocking(gfp_mask));
386 if (!page)
387 goto fail;
390 memset(peer_req, 0, sizeof(*peer_req));
391 INIT_LIST_HEAD(&peer_req->w.list);
392 drbd_clear_interval(&peer_req->i);
393 peer_req->i.size = data_size;
394 peer_req->i.sector = sector;
395 peer_req->submit_jif = jiffies;
396 peer_req->peer_device = peer_device;
397 peer_req->pages = page;
399 * The block_id is opaque to the receiver. It is not endianness
400 * converted, and sent back to the sender unchanged.
402 peer_req->block_id = id;
404 return peer_req;
406 fail:
407 mempool_free(peer_req, drbd_ee_mempool);
408 return NULL;
411 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
412 int is_net)
414 might_sleep();
415 if (peer_req->flags & EE_HAS_DIGEST)
416 kfree(peer_req->digest);
417 drbd_free_pages(device, peer_req->pages, is_net);
418 D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
419 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
420 if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
421 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
422 drbd_al_complete_io(device, &peer_req->i);
424 mempool_free(peer_req, drbd_ee_mempool);
427 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
429 LIST_HEAD(work_list);
430 struct drbd_peer_request *peer_req, *t;
431 int count = 0;
432 int is_net = list == &device->net_ee;
434 spin_lock_irq(&device->resource->req_lock);
435 list_splice_init(list, &work_list);
436 spin_unlock_irq(&device->resource->req_lock);
438 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
439 __drbd_free_peer_req(device, peer_req, is_net);
440 count++;
442 return count;
446 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
448 static int drbd_finish_peer_reqs(struct drbd_device *device)
450 LIST_HEAD(work_list);
451 LIST_HEAD(reclaimed);
452 struct drbd_peer_request *peer_req, *t;
453 int err = 0;
455 spin_lock_irq(&device->resource->req_lock);
456 reclaim_finished_net_peer_reqs(device, &reclaimed);
457 list_splice_init(&device->done_ee, &work_list);
458 spin_unlock_irq(&device->resource->req_lock);
460 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
461 drbd_free_net_peer_req(device, peer_req);
463 /* possible callbacks here:
464 * e_end_block, and e_end_resync_block, e_send_superseded.
465 * all ignore the last argument.
467 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
468 int err2;
470 /* list_del not necessary, next/prev members not touched */
471 err2 = peer_req->w.cb(&peer_req->w, !!err);
472 if (!err)
473 err = err2;
474 drbd_free_peer_req(device, peer_req);
476 wake_up(&device->ee_wait);
478 return err;
481 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
482 struct list_head *head)
484 DEFINE_WAIT(wait);
486 /* avoids spin_lock/unlock
487 * and calling prepare_to_wait in the fast path */
488 while (!list_empty(head)) {
489 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
490 spin_unlock_irq(&device->resource->req_lock);
491 io_schedule();
492 finish_wait(&device->ee_wait, &wait);
493 spin_lock_irq(&device->resource->req_lock);
497 static void drbd_wait_ee_list_empty(struct drbd_device *device,
498 struct list_head *head)
500 spin_lock_irq(&device->resource->req_lock);
501 _drbd_wait_ee_list_empty(device, head);
502 spin_unlock_irq(&device->resource->req_lock);
505 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
507 struct kvec iov = {
508 .iov_base = buf,
509 .iov_len = size,
511 struct msghdr msg = {
512 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
514 return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
517 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
519 int rv;
521 rv = drbd_recv_short(connection->data.socket, buf, size, 0);
523 if (rv < 0) {
524 if (rv == -ECONNRESET)
525 drbd_info(connection, "sock was reset by peer\n");
526 else if (rv != -ERESTARTSYS)
527 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
528 } else if (rv == 0) {
529 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
530 long t;
531 rcu_read_lock();
532 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
533 rcu_read_unlock();
535 t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
537 if (t)
538 goto out;
540 drbd_info(connection, "sock was shut down by peer\n");
543 if (rv != size)
544 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
546 out:
547 return rv;
550 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
552 int err;
554 err = drbd_recv(connection, buf, size);
555 if (err != size) {
556 if (err >= 0)
557 err = -EIO;
558 } else
559 err = 0;
560 return err;
563 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
565 int err;
567 err = drbd_recv_all(connection, buf, size);
568 if (err && !signal_pending(current))
569 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
570 return err;
573 /* quoting tcp(7):
574 * On individual connections, the socket buffer size must be set prior to the
575 * listen(2) or connect(2) calls in order to have it take effect.
576 * This is our wrapper to do so.
578 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
579 unsigned int rcv)
581 /* open coded SO_SNDBUF, SO_RCVBUF */
582 if (snd) {
583 sock->sk->sk_sndbuf = snd;
584 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
586 if (rcv) {
587 sock->sk->sk_rcvbuf = rcv;
588 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
592 static struct socket *drbd_try_connect(struct drbd_connection *connection)
594 const char *what;
595 struct socket *sock;
596 struct sockaddr_in6 src_in6;
597 struct sockaddr_in6 peer_in6;
598 struct net_conf *nc;
599 int err, peer_addr_len, my_addr_len;
600 int sndbuf_size, rcvbuf_size, connect_int;
601 int disconnect_on_error = 1;
603 rcu_read_lock();
604 nc = rcu_dereference(connection->net_conf);
605 if (!nc) {
606 rcu_read_unlock();
607 return NULL;
609 sndbuf_size = nc->sndbuf_size;
610 rcvbuf_size = nc->rcvbuf_size;
611 connect_int = nc->connect_int;
612 rcu_read_unlock();
614 my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
615 memcpy(&src_in6, &connection->my_addr, my_addr_len);
617 if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
618 src_in6.sin6_port = 0;
619 else
620 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
622 peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
623 memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
625 what = "sock_create_kern";
626 err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
627 SOCK_STREAM, IPPROTO_TCP, &sock);
628 if (err < 0) {
629 sock = NULL;
630 goto out;
633 sock->sk->sk_rcvtimeo =
634 sock->sk->sk_sndtimeo = connect_int * HZ;
635 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
637 /* explicitly bind to the configured IP as source IP
638 * for the outgoing connections.
639 * This is needed for multihomed hosts and to be
640 * able to use lo: interfaces for drbd.
641 * Make sure to use 0 as port number, so linux selects
642 * a free one dynamically.
644 what = "bind before connect";
645 err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
646 if (err < 0)
647 goto out;
649 /* connect may fail, peer not yet available.
650 * stay C_WF_CONNECTION, don't go Disconnecting! */
651 disconnect_on_error = 0;
652 what = "connect";
653 err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
655 out:
656 if (err < 0) {
657 if (sock) {
658 sock_release(sock);
659 sock = NULL;
661 switch (-err) {
662 /* timeout, busy, signal pending */
663 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
664 case EINTR: case ERESTARTSYS:
665 /* peer not (yet) available, network problem */
666 case ECONNREFUSED: case ENETUNREACH:
667 case EHOSTDOWN: case EHOSTUNREACH:
668 disconnect_on_error = 0;
669 break;
670 default:
671 drbd_err(connection, "%s failed, err = %d\n", what, err);
673 if (disconnect_on_error)
674 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
677 return sock;
680 struct accept_wait_data {
681 struct drbd_connection *connection;
682 struct socket *s_listen;
683 struct completion door_bell;
684 void (*original_sk_state_change)(struct sock *sk);
688 static void drbd_incoming_connection(struct sock *sk)
690 struct accept_wait_data *ad = sk->sk_user_data;
691 void (*state_change)(struct sock *sk);
693 state_change = ad->original_sk_state_change;
694 if (sk->sk_state == TCP_ESTABLISHED)
695 complete(&ad->door_bell);
696 state_change(sk);
699 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
701 int err, sndbuf_size, rcvbuf_size, my_addr_len;
702 struct sockaddr_in6 my_addr;
703 struct socket *s_listen;
704 struct net_conf *nc;
705 const char *what;
707 rcu_read_lock();
708 nc = rcu_dereference(connection->net_conf);
709 if (!nc) {
710 rcu_read_unlock();
711 return -EIO;
713 sndbuf_size = nc->sndbuf_size;
714 rcvbuf_size = nc->rcvbuf_size;
715 rcu_read_unlock();
717 my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
718 memcpy(&my_addr, &connection->my_addr, my_addr_len);
720 what = "sock_create_kern";
721 err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
722 SOCK_STREAM, IPPROTO_TCP, &s_listen);
723 if (err) {
724 s_listen = NULL;
725 goto out;
728 s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
729 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
731 what = "bind before listen";
732 err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
733 if (err < 0)
734 goto out;
736 ad->s_listen = s_listen;
737 write_lock_bh(&s_listen->sk->sk_callback_lock);
738 ad->original_sk_state_change = s_listen->sk->sk_state_change;
739 s_listen->sk->sk_state_change = drbd_incoming_connection;
740 s_listen->sk->sk_user_data = ad;
741 write_unlock_bh(&s_listen->sk->sk_callback_lock);
743 what = "listen";
744 err = s_listen->ops->listen(s_listen, 5);
745 if (err < 0)
746 goto out;
748 return 0;
749 out:
750 if (s_listen)
751 sock_release(s_listen);
752 if (err < 0) {
753 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
754 drbd_err(connection, "%s failed, err = %d\n", what, err);
755 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
759 return -EIO;
762 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
764 write_lock_bh(&sk->sk_callback_lock);
765 sk->sk_state_change = ad->original_sk_state_change;
766 sk->sk_user_data = NULL;
767 write_unlock_bh(&sk->sk_callback_lock);
770 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
772 int timeo, connect_int, err = 0;
773 struct socket *s_estab = NULL;
774 struct net_conf *nc;
776 rcu_read_lock();
777 nc = rcu_dereference(connection->net_conf);
778 if (!nc) {
779 rcu_read_unlock();
780 return NULL;
782 connect_int = nc->connect_int;
783 rcu_read_unlock();
785 timeo = connect_int * HZ;
786 /* 28.5% random jitter */
787 timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
789 err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
790 if (err <= 0)
791 return NULL;
793 err = kernel_accept(ad->s_listen, &s_estab, 0);
794 if (err < 0) {
795 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
796 drbd_err(connection, "accept failed, err = %d\n", err);
797 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
801 if (s_estab)
802 unregister_state_change(s_estab->sk, ad);
804 return s_estab;
807 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
809 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
810 enum drbd_packet cmd)
812 if (!conn_prepare_command(connection, sock))
813 return -EIO;
814 return conn_send_command(connection, sock, cmd, 0, NULL, 0);
817 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
819 unsigned int header_size = drbd_header_size(connection);
820 struct packet_info pi;
821 struct net_conf *nc;
822 int err;
824 rcu_read_lock();
825 nc = rcu_dereference(connection->net_conf);
826 if (!nc) {
827 rcu_read_unlock();
828 return -EIO;
830 sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
831 rcu_read_unlock();
833 err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
834 if (err != header_size) {
835 if (err >= 0)
836 err = -EIO;
837 return err;
839 err = decode_header(connection, connection->data.rbuf, &pi);
840 if (err)
841 return err;
842 return pi.cmd;
846 * drbd_socket_okay() - Free the socket if its connection is not okay
847 * @sock: pointer to the pointer to the socket.
849 static bool drbd_socket_okay(struct socket **sock)
851 int rr;
852 char tb[4];
854 if (!*sock)
855 return false;
857 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
859 if (rr > 0 || rr == -EAGAIN) {
860 return true;
861 } else {
862 sock_release(*sock);
863 *sock = NULL;
864 return false;
868 static bool connection_established(struct drbd_connection *connection,
869 struct socket **sock1,
870 struct socket **sock2)
872 struct net_conf *nc;
873 int timeout;
874 bool ok;
876 if (!*sock1 || !*sock2)
877 return false;
879 rcu_read_lock();
880 nc = rcu_dereference(connection->net_conf);
881 timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
882 rcu_read_unlock();
883 schedule_timeout_interruptible(timeout);
885 ok = drbd_socket_okay(sock1);
886 ok = drbd_socket_okay(sock2) && ok;
888 return ok;
891 /* Gets called if a connection is established, or if a new minor gets created
892 in a connection */
893 int drbd_connected(struct drbd_peer_device *peer_device)
895 struct drbd_device *device = peer_device->device;
896 int err;
898 atomic_set(&device->packet_seq, 0);
899 device->peer_seq = 0;
901 device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
902 &peer_device->connection->cstate_mutex :
903 &device->own_state_mutex;
905 err = drbd_send_sync_param(peer_device);
906 if (!err)
907 err = drbd_send_sizes(peer_device, 0, 0);
908 if (!err)
909 err = drbd_send_uuids(peer_device);
910 if (!err)
911 err = drbd_send_current_state(peer_device);
912 clear_bit(USE_DEGR_WFC_T, &device->flags);
913 clear_bit(RESIZE_PENDING, &device->flags);
914 atomic_set(&device->ap_in_flight, 0);
915 mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
916 return err;
920 * return values:
921 * 1 yes, we have a valid connection
922 * 0 oops, did not work out, please try again
923 * -1 peer talks different language,
924 * no point in trying again, please go standalone.
925 * -2 We do not have a network config...
927 static int conn_connect(struct drbd_connection *connection)
929 struct drbd_socket sock, msock;
930 struct drbd_peer_device *peer_device;
931 struct net_conf *nc;
932 int vnr, timeout, h;
933 bool discard_my_data, ok;
934 enum drbd_state_rv rv;
935 struct accept_wait_data ad = {
936 .connection = connection,
937 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
940 clear_bit(DISCONNECT_SENT, &connection->flags);
941 if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
942 return -2;
944 mutex_init(&sock.mutex);
945 sock.sbuf = connection->data.sbuf;
946 sock.rbuf = connection->data.rbuf;
947 sock.socket = NULL;
948 mutex_init(&msock.mutex);
949 msock.sbuf = connection->meta.sbuf;
950 msock.rbuf = connection->meta.rbuf;
951 msock.socket = NULL;
953 /* Assume that the peer only understands protocol 80 until we know better. */
954 connection->agreed_pro_version = 80;
956 if (prepare_listen_socket(connection, &ad))
957 return 0;
959 do {
960 struct socket *s;
962 s = drbd_try_connect(connection);
963 if (s) {
964 if (!sock.socket) {
965 sock.socket = s;
966 send_first_packet(connection, &sock, P_INITIAL_DATA);
967 } else if (!msock.socket) {
968 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
969 msock.socket = s;
970 send_first_packet(connection, &msock, P_INITIAL_META);
971 } else {
972 drbd_err(connection, "Logic error in conn_connect()\n");
973 goto out_release_sockets;
977 if (connection_established(connection, &sock.socket, &msock.socket))
978 break;
980 retry:
981 s = drbd_wait_for_connect(connection, &ad);
982 if (s) {
983 int fp = receive_first_packet(connection, s);
984 drbd_socket_okay(&sock.socket);
985 drbd_socket_okay(&msock.socket);
986 switch (fp) {
987 case P_INITIAL_DATA:
988 if (sock.socket) {
989 drbd_warn(connection, "initial packet S crossed\n");
990 sock_release(sock.socket);
991 sock.socket = s;
992 goto randomize;
994 sock.socket = s;
995 break;
996 case P_INITIAL_META:
997 set_bit(RESOLVE_CONFLICTS, &connection->flags);
998 if (msock.socket) {
999 drbd_warn(connection, "initial packet M crossed\n");
1000 sock_release(msock.socket);
1001 msock.socket = s;
1002 goto randomize;
1004 msock.socket = s;
1005 break;
1006 default:
1007 drbd_warn(connection, "Error receiving initial packet\n");
1008 sock_release(s);
1009 randomize:
1010 if (prandom_u32() & 1)
1011 goto retry;
1015 if (connection->cstate <= C_DISCONNECTING)
1016 goto out_release_sockets;
1017 if (signal_pending(current)) {
1018 flush_signals(current);
1019 smp_rmb();
1020 if (get_t_state(&connection->receiver) == EXITING)
1021 goto out_release_sockets;
1024 ok = connection_established(connection, &sock.socket, &msock.socket);
1025 } while (!ok);
1027 if (ad.s_listen)
1028 sock_release(ad.s_listen);
1030 sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1031 msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1033 sock.socket->sk->sk_allocation = GFP_NOIO;
1034 msock.socket->sk->sk_allocation = GFP_NOIO;
1036 sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1037 msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1039 /* NOT YET ...
1040 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1041 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1042 * first set it to the P_CONNECTION_FEATURES timeout,
1043 * which we set to 4x the configured ping_timeout. */
1044 rcu_read_lock();
1045 nc = rcu_dereference(connection->net_conf);
1047 sock.socket->sk->sk_sndtimeo =
1048 sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1050 msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1051 timeout = nc->timeout * HZ / 10;
1052 discard_my_data = nc->discard_my_data;
1053 rcu_read_unlock();
1055 msock.socket->sk->sk_sndtimeo = timeout;
1057 /* we don't want delays.
1058 * we use TCP_CORK where appropriate, though */
1059 drbd_tcp_nodelay(sock.socket);
1060 drbd_tcp_nodelay(msock.socket);
1062 connection->data.socket = sock.socket;
1063 connection->meta.socket = msock.socket;
1064 connection->last_received = jiffies;
1066 h = drbd_do_features(connection);
1067 if (h <= 0)
1068 return h;
1070 if (connection->cram_hmac_tfm) {
1071 /* drbd_request_state(device, NS(conn, WFAuth)); */
1072 switch (drbd_do_auth(connection)) {
1073 case -1:
1074 drbd_err(connection, "Authentication of peer failed\n");
1075 return -1;
1076 case 0:
1077 drbd_err(connection, "Authentication of peer failed, trying again.\n");
1078 return 0;
1082 connection->data.socket->sk->sk_sndtimeo = timeout;
1083 connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1085 if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1086 return -1;
1088 /* Prevent a race between resync-handshake and
1089 * being promoted to Primary.
1091 * Grab and release the state mutex, so we know that any current
1092 * drbd_set_role() is finished, and any incoming drbd_set_role
1093 * will see the STATE_SENT flag, and wait for it to be cleared.
1095 idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1096 mutex_lock(peer_device->device->state_mutex);
1098 set_bit(STATE_SENT, &connection->flags);
1100 idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1101 mutex_unlock(peer_device->device->state_mutex);
1103 rcu_read_lock();
1104 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1105 struct drbd_device *device = peer_device->device;
1106 kref_get(&device->kref);
1107 rcu_read_unlock();
1109 if (discard_my_data)
1110 set_bit(DISCARD_MY_DATA, &device->flags);
1111 else
1112 clear_bit(DISCARD_MY_DATA, &device->flags);
1114 drbd_connected(peer_device);
1115 kref_put(&device->kref, drbd_destroy_device);
1116 rcu_read_lock();
1118 rcu_read_unlock();
1120 rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1121 if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1122 clear_bit(STATE_SENT, &connection->flags);
1123 return 0;
1126 drbd_thread_start(&connection->ack_receiver);
1127 /* opencoded create_singlethread_workqueue(),
1128 * to be able to use format string arguments */
1129 connection->ack_sender =
1130 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1131 if (!connection->ack_sender) {
1132 drbd_err(connection, "Failed to create workqueue ack_sender\n");
1133 return 0;
1136 mutex_lock(&connection->resource->conf_update);
1137 /* The discard_my_data flag is a single-shot modifier to the next
1138 * connection attempt, the handshake of which is now well underway.
1139 * No need for rcu style copying of the whole struct
1140 * just to clear a single value. */
1141 connection->net_conf->discard_my_data = 0;
1142 mutex_unlock(&connection->resource->conf_update);
1144 return h;
1146 out_release_sockets:
1147 if (ad.s_listen)
1148 sock_release(ad.s_listen);
1149 if (sock.socket)
1150 sock_release(sock.socket);
1151 if (msock.socket)
1152 sock_release(msock.socket);
1153 return -1;
1156 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1158 unsigned int header_size = drbd_header_size(connection);
1160 if (header_size == sizeof(struct p_header100) &&
1161 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1162 struct p_header100 *h = header;
1163 if (h->pad != 0) {
1164 drbd_err(connection, "Header padding is not zero\n");
1165 return -EINVAL;
1167 pi->vnr = be16_to_cpu(h->volume);
1168 pi->cmd = be16_to_cpu(h->command);
1169 pi->size = be32_to_cpu(h->length);
1170 } else if (header_size == sizeof(struct p_header95) &&
1171 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1172 struct p_header95 *h = header;
1173 pi->cmd = be16_to_cpu(h->command);
1174 pi->size = be32_to_cpu(h->length);
1175 pi->vnr = 0;
1176 } else if (header_size == sizeof(struct p_header80) &&
1177 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1178 struct p_header80 *h = header;
1179 pi->cmd = be16_to_cpu(h->command);
1180 pi->size = be16_to_cpu(h->length);
1181 pi->vnr = 0;
1182 } else {
1183 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1184 be32_to_cpu(*(__be32 *)header),
1185 connection->agreed_pro_version);
1186 return -EINVAL;
1188 pi->data = header + header_size;
1189 return 0;
1192 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1194 void *buffer = connection->data.rbuf;
1195 int err;
1197 err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1198 if (err)
1199 return err;
1201 err = decode_header(connection, buffer, pi);
1202 connection->last_received = jiffies;
1204 return err;
1207 static void drbd_flush(struct drbd_connection *connection)
1209 int rv;
1210 struct drbd_peer_device *peer_device;
1211 int vnr;
1213 if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1214 rcu_read_lock();
1215 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1216 struct drbd_device *device = peer_device->device;
1218 if (!get_ldev(device))
1219 continue;
1220 kref_get(&device->kref);
1221 rcu_read_unlock();
1223 /* Right now, we have only this one synchronous code path
1224 * for flushes between request epochs.
1225 * We may want to make those asynchronous,
1226 * or at least parallelize the flushes to the volume devices.
1228 device->flush_jif = jiffies;
1229 set_bit(FLUSH_PENDING, &device->flags);
1230 rv = blkdev_issue_flush(device->ldev->backing_bdev,
1231 GFP_NOIO, NULL);
1232 clear_bit(FLUSH_PENDING, &device->flags);
1233 if (rv) {
1234 drbd_info(device, "local disk flush failed with status %d\n", rv);
1235 /* would rather check on EOPNOTSUPP, but that is not reliable.
1236 * don't try again for ANY return value != 0
1237 * if (rv == -EOPNOTSUPP) */
1238 drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1240 put_ldev(device);
1241 kref_put(&device->kref, drbd_destroy_device);
1243 rcu_read_lock();
1244 if (rv)
1245 break;
1247 rcu_read_unlock();
1252 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1253 * @device: DRBD device.
1254 * @epoch: Epoch object.
1255 * @ev: Epoch event.
1257 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1258 struct drbd_epoch *epoch,
1259 enum epoch_event ev)
1261 int epoch_size;
1262 struct drbd_epoch *next_epoch;
1263 enum finish_epoch rv = FE_STILL_LIVE;
1265 spin_lock(&connection->epoch_lock);
1266 do {
1267 next_epoch = NULL;
1269 epoch_size = atomic_read(&epoch->epoch_size);
1271 switch (ev & ~EV_CLEANUP) {
1272 case EV_PUT:
1273 atomic_dec(&epoch->active);
1274 break;
1275 case EV_GOT_BARRIER_NR:
1276 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1277 break;
1278 case EV_BECAME_LAST:
1279 /* nothing to do*/
1280 break;
1283 if (epoch_size != 0 &&
1284 atomic_read(&epoch->active) == 0 &&
1285 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1286 if (!(ev & EV_CLEANUP)) {
1287 spin_unlock(&connection->epoch_lock);
1288 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1289 spin_lock(&connection->epoch_lock);
1291 #if 0
1292 /* FIXME: dec unacked on connection, once we have
1293 * something to count pending connection packets in. */
1294 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1295 dec_unacked(epoch->connection);
1296 #endif
1298 if (connection->current_epoch != epoch) {
1299 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1300 list_del(&epoch->list);
1301 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1302 connection->epochs--;
1303 kfree(epoch);
1305 if (rv == FE_STILL_LIVE)
1306 rv = FE_DESTROYED;
1307 } else {
1308 epoch->flags = 0;
1309 atomic_set(&epoch->epoch_size, 0);
1310 /* atomic_set(&epoch->active, 0); is already zero */
1311 if (rv == FE_STILL_LIVE)
1312 rv = FE_RECYCLED;
1316 if (!next_epoch)
1317 break;
1319 epoch = next_epoch;
1320 } while (1);
1322 spin_unlock(&connection->epoch_lock);
1324 return rv;
1327 static enum write_ordering_e
1328 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1330 struct disk_conf *dc;
1332 dc = rcu_dereference(bdev->disk_conf);
1334 if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1335 wo = WO_DRAIN_IO;
1336 if (wo == WO_DRAIN_IO && !dc->disk_drain)
1337 wo = WO_NONE;
1339 return wo;
1343 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1344 * @connection: DRBD connection.
1345 * @wo: Write ordering method to try.
1347 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1348 enum write_ordering_e wo)
1350 struct drbd_device *device;
1351 enum write_ordering_e pwo;
1352 int vnr;
1353 static char *write_ordering_str[] = {
1354 [WO_NONE] = "none",
1355 [WO_DRAIN_IO] = "drain",
1356 [WO_BDEV_FLUSH] = "flush",
1359 pwo = resource->write_ordering;
1360 if (wo != WO_BDEV_FLUSH)
1361 wo = min(pwo, wo);
1362 rcu_read_lock();
1363 idr_for_each_entry(&resource->devices, device, vnr) {
1364 if (get_ldev(device)) {
1365 wo = max_allowed_wo(device->ldev, wo);
1366 if (device->ldev == bdev)
1367 bdev = NULL;
1368 put_ldev(device);
1372 if (bdev)
1373 wo = max_allowed_wo(bdev, wo);
1375 rcu_read_unlock();
1377 resource->write_ordering = wo;
1378 if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1379 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1383 * drbd_submit_peer_request()
1384 * @device: DRBD device.
1385 * @peer_req: peer request
1386 * @rw: flag field, see bio->bi_rw
1388 * May spread the pages to multiple bios,
1389 * depending on bio_add_page restrictions.
1391 * Returns 0 if all bios have been submitted,
1392 * -ENOMEM if we could not allocate enough bios,
1393 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1394 * single page to an empty bio (which should never happen and likely indicates
1395 * that the lower level IO stack is in some way broken). This has been observed
1396 * on certain Xen deployments.
1398 /* TODO allocate from our own bio_set. */
1399 int drbd_submit_peer_request(struct drbd_device *device,
1400 struct drbd_peer_request *peer_req,
1401 const unsigned rw, const int fault_type)
1403 struct bio *bios = NULL;
1404 struct bio *bio;
1405 struct page *page = peer_req->pages;
1406 sector_t sector = peer_req->i.sector;
1407 unsigned data_size = peer_req->i.size;
1408 unsigned n_bios = 0;
1409 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1410 int err = -ENOMEM;
1412 if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) {
1413 /* wait for all pending IO completions, before we start
1414 * zeroing things out. */
1415 conn_wait_active_ee_empty(peer_req->peer_device->connection);
1416 /* add it to the active list now,
1417 * so we can find it to present it in debugfs */
1418 peer_req->submit_jif = jiffies;
1419 peer_req->flags |= EE_SUBMITTED;
1420 spin_lock_irq(&device->resource->req_lock);
1421 list_add_tail(&peer_req->w.list, &device->active_ee);
1422 spin_unlock_irq(&device->resource->req_lock);
1423 if (blkdev_issue_zeroout(device->ldev->backing_bdev,
1424 sector, data_size >> 9, GFP_NOIO, false))
1425 peer_req->flags |= EE_WAS_ERROR;
1426 drbd_endio_write_sec_final(peer_req);
1427 return 0;
1430 /* Discards don't have any payload.
1431 * But the scsi layer still expects a bio_vec it can use internally,
1432 * see sd_setup_discard_cmnd() and blk_add_request_payload(). */
1433 if (peer_req->flags & EE_IS_TRIM)
1434 nr_pages = 1;
1436 /* In most cases, we will only need one bio. But in case the lower
1437 * level restrictions happen to be different at this offset on this
1438 * side than those of the sending peer, we may need to submit the
1439 * request in more than one bio.
1441 * Plain bio_alloc is good enough here, this is no DRBD internally
1442 * generated bio, but a bio allocated on behalf of the peer.
1444 next_bio:
1445 bio = bio_alloc(GFP_NOIO, nr_pages);
1446 if (!bio) {
1447 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1448 goto fail;
1450 /* > peer_req->i.sector, unless this is the first bio */
1451 bio->bi_iter.bi_sector = sector;
1452 bio->bi_bdev = device->ldev->backing_bdev;
1453 bio->bi_rw = rw;
1454 bio->bi_private = peer_req;
1455 bio->bi_end_io = drbd_peer_request_endio;
1457 bio->bi_next = bios;
1458 bios = bio;
1459 ++n_bios;
1461 if (rw & REQ_DISCARD) {
1462 bio->bi_iter.bi_size = data_size;
1463 goto submit;
1466 page_chain_for_each(page) {
1467 unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1468 if (!bio_add_page(bio, page, len, 0)) {
1469 /* A single page must always be possible!
1470 * But in case it fails anyways,
1471 * we deal with it, and complain (below). */
1472 if (bio->bi_vcnt == 0) {
1473 drbd_err(device,
1474 "bio_add_page failed for len=%u, "
1475 "bi_vcnt=0 (bi_sector=%llu)\n",
1476 len, (uint64_t)bio->bi_iter.bi_sector);
1477 err = -ENOSPC;
1478 goto fail;
1480 goto next_bio;
1482 data_size -= len;
1483 sector += len >> 9;
1484 --nr_pages;
1486 D_ASSERT(device, data_size == 0);
1487 submit:
1488 D_ASSERT(device, page == NULL);
1490 atomic_set(&peer_req->pending_bios, n_bios);
1491 /* for debugfs: update timestamp, mark as submitted */
1492 peer_req->submit_jif = jiffies;
1493 peer_req->flags |= EE_SUBMITTED;
1494 do {
1495 bio = bios;
1496 bios = bios->bi_next;
1497 bio->bi_next = NULL;
1499 drbd_generic_make_request(device, fault_type, bio);
1500 } while (bios);
1501 return 0;
1503 fail:
1504 while (bios) {
1505 bio = bios;
1506 bios = bios->bi_next;
1507 bio_put(bio);
1509 return err;
1512 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1513 struct drbd_peer_request *peer_req)
1515 struct drbd_interval *i = &peer_req->i;
1517 drbd_remove_interval(&device->write_requests, i);
1518 drbd_clear_interval(i);
1520 /* Wake up any processes waiting for this peer request to complete. */
1521 if (i->waiting)
1522 wake_up(&device->misc_wait);
1525 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1527 struct drbd_peer_device *peer_device;
1528 int vnr;
1530 rcu_read_lock();
1531 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1532 struct drbd_device *device = peer_device->device;
1534 kref_get(&device->kref);
1535 rcu_read_unlock();
1536 drbd_wait_ee_list_empty(device, &device->active_ee);
1537 kref_put(&device->kref, drbd_destroy_device);
1538 rcu_read_lock();
1540 rcu_read_unlock();
1543 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1545 int rv;
1546 struct p_barrier *p = pi->data;
1547 struct drbd_epoch *epoch;
1549 /* FIXME these are unacked on connection,
1550 * not a specific (peer)device.
1552 connection->current_epoch->barrier_nr = p->barrier;
1553 connection->current_epoch->connection = connection;
1554 rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1556 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1557 * the activity log, which means it would not be resynced in case the
1558 * R_PRIMARY crashes now.
1559 * Therefore we must send the barrier_ack after the barrier request was
1560 * completed. */
1561 switch (connection->resource->write_ordering) {
1562 case WO_NONE:
1563 if (rv == FE_RECYCLED)
1564 return 0;
1566 /* receiver context, in the writeout path of the other node.
1567 * avoid potential distributed deadlock */
1568 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1569 if (epoch)
1570 break;
1571 else
1572 drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1573 /* Fall through */
1575 case WO_BDEV_FLUSH:
1576 case WO_DRAIN_IO:
1577 conn_wait_active_ee_empty(connection);
1578 drbd_flush(connection);
1580 if (atomic_read(&connection->current_epoch->epoch_size)) {
1581 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1582 if (epoch)
1583 break;
1586 return 0;
1587 default:
1588 drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1589 connection->resource->write_ordering);
1590 return -EIO;
1593 epoch->flags = 0;
1594 atomic_set(&epoch->epoch_size, 0);
1595 atomic_set(&epoch->active, 0);
1597 spin_lock(&connection->epoch_lock);
1598 if (atomic_read(&connection->current_epoch->epoch_size)) {
1599 list_add(&epoch->list, &connection->current_epoch->list);
1600 connection->current_epoch = epoch;
1601 connection->epochs++;
1602 } else {
1603 /* The current_epoch got recycled while we allocated this one... */
1604 kfree(epoch);
1606 spin_unlock(&connection->epoch_lock);
1608 return 0;
1611 /* used from receive_RSDataReply (recv_resync_read)
1612 * and from receive_Data */
1613 static struct drbd_peer_request *
1614 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1615 struct packet_info *pi) __must_hold(local)
1617 struct drbd_device *device = peer_device->device;
1618 const sector_t capacity = drbd_get_capacity(device->this_bdev);
1619 struct drbd_peer_request *peer_req;
1620 struct page *page;
1621 int digest_size, err;
1622 unsigned int data_size = pi->size, ds;
1623 void *dig_in = peer_device->connection->int_dig_in;
1624 void *dig_vv = peer_device->connection->int_dig_vv;
1625 unsigned long *data;
1626 struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1628 digest_size = 0;
1629 if (!trim && peer_device->connection->peer_integrity_tfm) {
1630 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1632 * FIXME: Receive the incoming digest into the receive buffer
1633 * here, together with its struct p_data?
1635 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1636 if (err)
1637 return NULL;
1638 data_size -= digest_size;
1641 if (trim) {
1642 D_ASSERT(peer_device, data_size == 0);
1643 data_size = be32_to_cpu(trim->size);
1646 if (!expect(IS_ALIGNED(data_size, 512)))
1647 return NULL;
1648 /* prepare for larger trim requests. */
1649 if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE))
1650 return NULL;
1652 /* even though we trust out peer,
1653 * we sometimes have to double check. */
1654 if (sector + (data_size>>9) > capacity) {
1655 drbd_err(device, "request from peer beyond end of local disk: "
1656 "capacity: %llus < sector: %llus + size: %u\n",
1657 (unsigned long long)capacity,
1658 (unsigned long long)sector, data_size);
1659 return NULL;
1662 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1663 * "criss-cross" setup, that might cause write-out on some other DRBD,
1664 * which in turn might block on the other node at this very place. */
1665 peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO);
1666 if (!peer_req)
1667 return NULL;
1669 peer_req->flags |= EE_WRITE;
1670 if (trim)
1671 return peer_req;
1673 ds = data_size;
1674 page = peer_req->pages;
1675 page_chain_for_each(page) {
1676 unsigned len = min_t(int, ds, PAGE_SIZE);
1677 data = kmap(page);
1678 err = drbd_recv_all_warn(peer_device->connection, data, len);
1679 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1680 drbd_err(device, "Fault injection: Corrupting data on receive\n");
1681 data[0] = data[0] ^ (unsigned long)-1;
1683 kunmap(page);
1684 if (err) {
1685 drbd_free_peer_req(device, peer_req);
1686 return NULL;
1688 ds -= len;
1691 if (digest_size) {
1692 drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1693 if (memcmp(dig_in, dig_vv, digest_size)) {
1694 drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1695 (unsigned long long)sector, data_size);
1696 drbd_free_peer_req(device, peer_req);
1697 return NULL;
1700 device->recv_cnt += data_size >> 9;
1701 return peer_req;
1704 /* drbd_drain_block() just takes a data block
1705 * out of the socket input buffer, and discards it.
1707 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1709 struct page *page;
1710 int err = 0;
1711 void *data;
1713 if (!data_size)
1714 return 0;
1716 page = drbd_alloc_pages(peer_device, 1, 1);
1718 data = kmap(page);
1719 while (data_size) {
1720 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1722 err = drbd_recv_all_warn(peer_device->connection, data, len);
1723 if (err)
1724 break;
1725 data_size -= len;
1727 kunmap(page);
1728 drbd_free_pages(peer_device->device, page, 0);
1729 return err;
1732 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1733 sector_t sector, int data_size)
1735 struct bio_vec bvec;
1736 struct bvec_iter iter;
1737 struct bio *bio;
1738 int digest_size, err, expect;
1739 void *dig_in = peer_device->connection->int_dig_in;
1740 void *dig_vv = peer_device->connection->int_dig_vv;
1742 digest_size = 0;
1743 if (peer_device->connection->peer_integrity_tfm) {
1744 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1745 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1746 if (err)
1747 return err;
1748 data_size -= digest_size;
1751 /* optimistically update recv_cnt. if receiving fails below,
1752 * we disconnect anyways, and counters will be reset. */
1753 peer_device->device->recv_cnt += data_size>>9;
1755 bio = req->master_bio;
1756 D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1758 bio_for_each_segment(bvec, bio, iter) {
1759 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1760 expect = min_t(int, data_size, bvec.bv_len);
1761 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1762 kunmap(bvec.bv_page);
1763 if (err)
1764 return err;
1765 data_size -= expect;
1768 if (digest_size) {
1769 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1770 if (memcmp(dig_in, dig_vv, digest_size)) {
1771 drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1772 return -EINVAL;
1776 D_ASSERT(peer_device->device, data_size == 0);
1777 return 0;
1781 * e_end_resync_block() is called in ack_sender context via
1782 * drbd_finish_peer_reqs().
1784 static int e_end_resync_block(struct drbd_work *w, int unused)
1786 struct drbd_peer_request *peer_req =
1787 container_of(w, struct drbd_peer_request, w);
1788 struct drbd_peer_device *peer_device = peer_req->peer_device;
1789 struct drbd_device *device = peer_device->device;
1790 sector_t sector = peer_req->i.sector;
1791 int err;
1793 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1795 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1796 drbd_set_in_sync(device, sector, peer_req->i.size);
1797 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1798 } else {
1799 /* Record failure to sync */
1800 drbd_rs_failed_io(device, sector, peer_req->i.size);
1802 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1804 dec_unacked(device);
1806 return err;
1809 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1810 struct packet_info *pi) __releases(local)
1812 struct drbd_device *device = peer_device->device;
1813 struct drbd_peer_request *peer_req;
1815 peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1816 if (!peer_req)
1817 goto fail;
1819 dec_rs_pending(device);
1821 inc_unacked(device);
1822 /* corresponding dec_unacked() in e_end_resync_block()
1823 * respective _drbd_clear_done_ee */
1825 peer_req->w.cb = e_end_resync_block;
1826 peer_req->submit_jif = jiffies;
1828 spin_lock_irq(&device->resource->req_lock);
1829 list_add_tail(&peer_req->w.list, &device->sync_ee);
1830 spin_unlock_irq(&device->resource->req_lock);
1832 atomic_add(pi->size >> 9, &device->rs_sect_ev);
1833 if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1834 return 0;
1836 /* don't care for the reason here */
1837 drbd_err(device, "submit failed, triggering re-connect\n");
1838 spin_lock_irq(&device->resource->req_lock);
1839 list_del(&peer_req->w.list);
1840 spin_unlock_irq(&device->resource->req_lock);
1842 drbd_free_peer_req(device, peer_req);
1843 fail:
1844 put_ldev(device);
1845 return -EIO;
1848 static struct drbd_request *
1849 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1850 sector_t sector, bool missing_ok, const char *func)
1852 struct drbd_request *req;
1854 /* Request object according to our peer */
1855 req = (struct drbd_request *)(unsigned long)id;
1856 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1857 return req;
1858 if (!missing_ok) {
1859 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1860 (unsigned long)id, (unsigned long long)sector);
1862 return NULL;
1865 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1867 struct drbd_peer_device *peer_device;
1868 struct drbd_device *device;
1869 struct drbd_request *req;
1870 sector_t sector;
1871 int err;
1872 struct p_data *p = pi->data;
1874 peer_device = conn_peer_device(connection, pi->vnr);
1875 if (!peer_device)
1876 return -EIO;
1877 device = peer_device->device;
1879 sector = be64_to_cpu(p->sector);
1881 spin_lock_irq(&device->resource->req_lock);
1882 req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1883 spin_unlock_irq(&device->resource->req_lock);
1884 if (unlikely(!req))
1885 return -EIO;
1887 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1888 * special casing it there for the various failure cases.
1889 * still no race with drbd_fail_pending_reads */
1890 err = recv_dless_read(peer_device, req, sector, pi->size);
1891 if (!err)
1892 req_mod(req, DATA_RECEIVED);
1893 /* else: nothing. handled from drbd_disconnect...
1894 * I don't think we may complete this just yet
1895 * in case we are "on-disconnect: freeze" */
1897 return err;
1900 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1902 struct drbd_peer_device *peer_device;
1903 struct drbd_device *device;
1904 sector_t sector;
1905 int err;
1906 struct p_data *p = pi->data;
1908 peer_device = conn_peer_device(connection, pi->vnr);
1909 if (!peer_device)
1910 return -EIO;
1911 device = peer_device->device;
1913 sector = be64_to_cpu(p->sector);
1914 D_ASSERT(device, p->block_id == ID_SYNCER);
1916 if (get_ldev(device)) {
1917 /* data is submitted to disk within recv_resync_read.
1918 * corresponding put_ldev done below on error,
1919 * or in drbd_peer_request_endio. */
1920 err = recv_resync_read(peer_device, sector, pi);
1921 } else {
1922 if (__ratelimit(&drbd_ratelimit_state))
1923 drbd_err(device, "Can not write resync data to local disk.\n");
1925 err = drbd_drain_block(peer_device, pi->size);
1927 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1930 atomic_add(pi->size >> 9, &device->rs_sect_in);
1932 return err;
1935 static void restart_conflicting_writes(struct drbd_device *device,
1936 sector_t sector, int size)
1938 struct drbd_interval *i;
1939 struct drbd_request *req;
1941 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1942 if (!i->local)
1943 continue;
1944 req = container_of(i, struct drbd_request, i);
1945 if (req->rq_state & RQ_LOCAL_PENDING ||
1946 !(req->rq_state & RQ_POSTPONED))
1947 continue;
1948 /* as it is RQ_POSTPONED, this will cause it to
1949 * be queued on the retry workqueue. */
1950 __req_mod(req, CONFLICT_RESOLVED, NULL);
1955 * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
1957 static int e_end_block(struct drbd_work *w, int cancel)
1959 struct drbd_peer_request *peer_req =
1960 container_of(w, struct drbd_peer_request, w);
1961 struct drbd_peer_device *peer_device = peer_req->peer_device;
1962 struct drbd_device *device = peer_device->device;
1963 sector_t sector = peer_req->i.sector;
1964 int err = 0, pcmd;
1966 if (peer_req->flags & EE_SEND_WRITE_ACK) {
1967 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1968 pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1969 device->state.conn <= C_PAUSED_SYNC_T &&
1970 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1971 P_RS_WRITE_ACK : P_WRITE_ACK;
1972 err = drbd_send_ack(peer_device, pcmd, peer_req);
1973 if (pcmd == P_RS_WRITE_ACK)
1974 drbd_set_in_sync(device, sector, peer_req->i.size);
1975 } else {
1976 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1977 /* we expect it to be marked out of sync anyways...
1978 * maybe assert this? */
1980 dec_unacked(device);
1983 /* we delete from the conflict detection hash _after_ we sent out the
1984 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1985 if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1986 spin_lock_irq(&device->resource->req_lock);
1987 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1988 drbd_remove_epoch_entry_interval(device, peer_req);
1989 if (peer_req->flags & EE_RESTART_REQUESTS)
1990 restart_conflicting_writes(device, sector, peer_req->i.size);
1991 spin_unlock_irq(&device->resource->req_lock);
1992 } else
1993 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1995 drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1997 return err;
2000 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2002 struct drbd_peer_request *peer_req =
2003 container_of(w, struct drbd_peer_request, w);
2004 struct drbd_peer_device *peer_device = peer_req->peer_device;
2005 int err;
2007 err = drbd_send_ack(peer_device, ack, peer_req);
2008 dec_unacked(peer_device->device);
2010 return err;
2013 static int e_send_superseded(struct drbd_work *w, int unused)
2015 return e_send_ack(w, P_SUPERSEDED);
2018 static int e_send_retry_write(struct drbd_work *w, int unused)
2020 struct drbd_peer_request *peer_req =
2021 container_of(w, struct drbd_peer_request, w);
2022 struct drbd_connection *connection = peer_req->peer_device->connection;
2024 return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2025 P_RETRY_WRITE : P_SUPERSEDED);
2028 static bool seq_greater(u32 a, u32 b)
2031 * We assume 32-bit wrap-around here.
2032 * For 24-bit wrap-around, we would have to shift:
2033 * a <<= 8; b <<= 8;
2035 return (s32)a - (s32)b > 0;
2038 static u32 seq_max(u32 a, u32 b)
2040 return seq_greater(a, b) ? a : b;
2043 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2045 struct drbd_device *device = peer_device->device;
2046 unsigned int newest_peer_seq;
2048 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2049 spin_lock(&device->peer_seq_lock);
2050 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2051 device->peer_seq = newest_peer_seq;
2052 spin_unlock(&device->peer_seq_lock);
2053 /* wake up only if we actually changed device->peer_seq */
2054 if (peer_seq == newest_peer_seq)
2055 wake_up(&device->seq_wait);
2059 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2061 return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2064 /* maybe change sync_ee into interval trees as well? */
2065 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2067 struct drbd_peer_request *rs_req;
2068 bool rv = 0;
2070 spin_lock_irq(&device->resource->req_lock);
2071 list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2072 if (overlaps(peer_req->i.sector, peer_req->i.size,
2073 rs_req->i.sector, rs_req->i.size)) {
2074 rv = 1;
2075 break;
2078 spin_unlock_irq(&device->resource->req_lock);
2080 return rv;
2083 /* Called from receive_Data.
2084 * Synchronize packets on sock with packets on msock.
2086 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2087 * packet traveling on msock, they are still processed in the order they have
2088 * been sent.
2090 * Note: we don't care for Ack packets overtaking P_DATA packets.
2092 * In case packet_seq is larger than device->peer_seq number, there are
2093 * outstanding packets on the msock. We wait for them to arrive.
2094 * In case we are the logically next packet, we update device->peer_seq
2095 * ourselves. Correctly handles 32bit wrap around.
2097 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2098 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2099 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2100 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2102 * returns 0 if we may process the packet,
2103 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2104 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2106 struct drbd_device *device = peer_device->device;
2107 DEFINE_WAIT(wait);
2108 long timeout;
2109 int ret = 0, tp;
2111 if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2112 return 0;
2114 spin_lock(&device->peer_seq_lock);
2115 for (;;) {
2116 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2117 device->peer_seq = seq_max(device->peer_seq, peer_seq);
2118 break;
2121 if (signal_pending(current)) {
2122 ret = -ERESTARTSYS;
2123 break;
2126 rcu_read_lock();
2127 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2128 rcu_read_unlock();
2130 if (!tp)
2131 break;
2133 /* Only need to wait if two_primaries is enabled */
2134 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2135 spin_unlock(&device->peer_seq_lock);
2136 rcu_read_lock();
2137 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2138 rcu_read_unlock();
2139 timeout = schedule_timeout(timeout);
2140 spin_lock(&device->peer_seq_lock);
2141 if (!timeout) {
2142 ret = -ETIMEDOUT;
2143 drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2144 break;
2147 spin_unlock(&device->peer_seq_lock);
2148 finish_wait(&device->seq_wait, &wait);
2149 return ret;
2152 /* see also bio_flags_to_wire()
2153 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2154 * flags and back. We may replicate to other kernel versions. */
2155 static unsigned long wire_flags_to_bio(u32 dpf)
2157 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2158 (dpf & DP_FUA ? REQ_FUA : 0) |
2159 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2160 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2163 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2164 unsigned int size)
2166 struct drbd_interval *i;
2168 repeat:
2169 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2170 struct drbd_request *req;
2171 struct bio_and_error m;
2173 if (!i->local)
2174 continue;
2175 req = container_of(i, struct drbd_request, i);
2176 if (!(req->rq_state & RQ_POSTPONED))
2177 continue;
2178 req->rq_state &= ~RQ_POSTPONED;
2179 __req_mod(req, NEG_ACKED, &m);
2180 spin_unlock_irq(&device->resource->req_lock);
2181 if (m.bio)
2182 complete_master_bio(device, &m);
2183 spin_lock_irq(&device->resource->req_lock);
2184 goto repeat;
2188 static int handle_write_conflicts(struct drbd_device *device,
2189 struct drbd_peer_request *peer_req)
2191 struct drbd_connection *connection = peer_req->peer_device->connection;
2192 bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2193 sector_t sector = peer_req->i.sector;
2194 const unsigned int size = peer_req->i.size;
2195 struct drbd_interval *i;
2196 bool equal;
2197 int err;
2200 * Inserting the peer request into the write_requests tree will prevent
2201 * new conflicting local requests from being added.
2203 drbd_insert_interval(&device->write_requests, &peer_req->i);
2205 repeat:
2206 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2207 if (i == &peer_req->i)
2208 continue;
2209 if (i->completed)
2210 continue;
2212 if (!i->local) {
2214 * Our peer has sent a conflicting remote request; this
2215 * should not happen in a two-node setup. Wait for the
2216 * earlier peer request to complete.
2218 err = drbd_wait_misc(device, i);
2219 if (err)
2220 goto out;
2221 goto repeat;
2224 equal = i->sector == sector && i->size == size;
2225 if (resolve_conflicts) {
2227 * If the peer request is fully contained within the
2228 * overlapping request, it can be considered overwritten
2229 * and thus superseded; otherwise, it will be retried
2230 * once all overlapping requests have completed.
2232 bool superseded = i->sector <= sector && i->sector +
2233 (i->size >> 9) >= sector + (size >> 9);
2235 if (!equal)
2236 drbd_alert(device, "Concurrent writes detected: "
2237 "local=%llus +%u, remote=%llus +%u, "
2238 "assuming %s came first\n",
2239 (unsigned long long)i->sector, i->size,
2240 (unsigned long long)sector, size,
2241 superseded ? "local" : "remote");
2243 peer_req->w.cb = superseded ? e_send_superseded :
2244 e_send_retry_write;
2245 list_add_tail(&peer_req->w.list, &device->done_ee);
2246 queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2248 err = -ENOENT;
2249 goto out;
2250 } else {
2251 struct drbd_request *req =
2252 container_of(i, struct drbd_request, i);
2254 if (!equal)
2255 drbd_alert(device, "Concurrent writes detected: "
2256 "local=%llus +%u, remote=%llus +%u\n",
2257 (unsigned long long)i->sector, i->size,
2258 (unsigned long long)sector, size);
2260 if (req->rq_state & RQ_LOCAL_PENDING ||
2261 !(req->rq_state & RQ_POSTPONED)) {
2263 * Wait for the node with the discard flag to
2264 * decide if this request has been superseded
2265 * or needs to be retried.
2266 * Requests that have been superseded will
2267 * disappear from the write_requests tree.
2269 * In addition, wait for the conflicting
2270 * request to finish locally before submitting
2271 * the conflicting peer request.
2273 err = drbd_wait_misc(device, &req->i);
2274 if (err) {
2275 _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2276 fail_postponed_requests(device, sector, size);
2277 goto out;
2279 goto repeat;
2282 * Remember to restart the conflicting requests after
2283 * the new peer request has completed.
2285 peer_req->flags |= EE_RESTART_REQUESTS;
2288 err = 0;
2290 out:
2291 if (err)
2292 drbd_remove_epoch_entry_interval(device, peer_req);
2293 return err;
2296 /* mirrored write */
2297 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2299 struct drbd_peer_device *peer_device;
2300 struct drbd_device *device;
2301 struct net_conf *nc;
2302 sector_t sector;
2303 struct drbd_peer_request *peer_req;
2304 struct p_data *p = pi->data;
2305 u32 peer_seq = be32_to_cpu(p->seq_num);
2306 int rw = WRITE;
2307 u32 dp_flags;
2308 int err, tp;
2310 peer_device = conn_peer_device(connection, pi->vnr);
2311 if (!peer_device)
2312 return -EIO;
2313 device = peer_device->device;
2315 if (!get_ldev(device)) {
2316 int err2;
2318 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2319 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2320 atomic_inc(&connection->current_epoch->epoch_size);
2321 err2 = drbd_drain_block(peer_device, pi->size);
2322 if (!err)
2323 err = err2;
2324 return err;
2328 * Corresponding put_ldev done either below (on various errors), or in
2329 * drbd_peer_request_endio, if we successfully submit the data at the
2330 * end of this function.
2333 sector = be64_to_cpu(p->sector);
2334 peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2335 if (!peer_req) {
2336 put_ldev(device);
2337 return -EIO;
2340 peer_req->w.cb = e_end_block;
2341 peer_req->submit_jif = jiffies;
2342 peer_req->flags |= EE_APPLICATION;
2344 dp_flags = be32_to_cpu(p->dp_flags);
2345 rw |= wire_flags_to_bio(dp_flags);
2346 if (pi->cmd == P_TRIM) {
2347 struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
2348 peer_req->flags |= EE_IS_TRIM;
2349 if (!blk_queue_discard(q))
2350 peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
2351 D_ASSERT(peer_device, peer_req->i.size > 0);
2352 D_ASSERT(peer_device, rw & REQ_DISCARD);
2353 D_ASSERT(peer_device, peer_req->pages == NULL);
2354 } else if (peer_req->pages == NULL) {
2355 D_ASSERT(device, peer_req->i.size == 0);
2356 D_ASSERT(device, dp_flags & DP_FLUSH);
2359 if (dp_flags & DP_MAY_SET_IN_SYNC)
2360 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2362 spin_lock(&connection->epoch_lock);
2363 peer_req->epoch = connection->current_epoch;
2364 atomic_inc(&peer_req->epoch->epoch_size);
2365 atomic_inc(&peer_req->epoch->active);
2366 spin_unlock(&connection->epoch_lock);
2368 rcu_read_lock();
2369 nc = rcu_dereference(peer_device->connection->net_conf);
2370 tp = nc->two_primaries;
2371 if (peer_device->connection->agreed_pro_version < 100) {
2372 switch (nc->wire_protocol) {
2373 case DRBD_PROT_C:
2374 dp_flags |= DP_SEND_WRITE_ACK;
2375 break;
2376 case DRBD_PROT_B:
2377 dp_flags |= DP_SEND_RECEIVE_ACK;
2378 break;
2381 rcu_read_unlock();
2383 if (dp_flags & DP_SEND_WRITE_ACK) {
2384 peer_req->flags |= EE_SEND_WRITE_ACK;
2385 inc_unacked(device);
2386 /* corresponding dec_unacked() in e_end_block()
2387 * respective _drbd_clear_done_ee */
2390 if (dp_flags & DP_SEND_RECEIVE_ACK) {
2391 /* I really don't like it that the receiver thread
2392 * sends on the msock, but anyways */
2393 drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2396 if (tp) {
2397 /* two primaries implies protocol C */
2398 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2399 peer_req->flags |= EE_IN_INTERVAL_TREE;
2400 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2401 if (err)
2402 goto out_interrupted;
2403 spin_lock_irq(&device->resource->req_lock);
2404 err = handle_write_conflicts(device, peer_req);
2405 if (err) {
2406 spin_unlock_irq(&device->resource->req_lock);
2407 if (err == -ENOENT) {
2408 put_ldev(device);
2409 return 0;
2411 goto out_interrupted;
2413 } else {
2414 update_peer_seq(peer_device, peer_seq);
2415 spin_lock_irq(&device->resource->req_lock);
2417 /* if we use the zeroout fallback code, we process synchronously
2418 * and we wait for all pending requests, respectively wait for
2419 * active_ee to become empty in drbd_submit_peer_request();
2420 * better not add ourselves here. */
2421 if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
2422 list_add_tail(&peer_req->w.list, &device->active_ee);
2423 spin_unlock_irq(&device->resource->req_lock);
2425 if (device->state.conn == C_SYNC_TARGET)
2426 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2428 if (device->state.pdsk < D_INCONSISTENT) {
2429 /* In case we have the only disk of the cluster, */
2430 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2431 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2432 drbd_al_begin_io(device, &peer_req->i);
2433 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2436 err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2437 if (!err)
2438 return 0;
2440 /* don't care for the reason here */
2441 drbd_err(device, "submit failed, triggering re-connect\n");
2442 spin_lock_irq(&device->resource->req_lock);
2443 list_del(&peer_req->w.list);
2444 drbd_remove_epoch_entry_interval(device, peer_req);
2445 spin_unlock_irq(&device->resource->req_lock);
2446 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2447 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2448 drbd_al_complete_io(device, &peer_req->i);
2451 out_interrupted:
2452 drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2453 put_ldev(device);
2454 drbd_free_peer_req(device, peer_req);
2455 return err;
2458 /* We may throttle resync, if the lower device seems to be busy,
2459 * and current sync rate is above c_min_rate.
2461 * To decide whether or not the lower device is busy, we use a scheme similar
2462 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2463 * (more than 64 sectors) of activity we cannot account for with our own resync
2464 * activity, it obviously is "busy".
2466 * The current sync rate used here uses only the most recent two step marks,
2467 * to have a short time average so we can react faster.
2469 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2470 bool throttle_if_app_is_waiting)
2472 struct lc_element *tmp;
2473 bool throttle = drbd_rs_c_min_rate_throttle(device);
2475 if (!throttle || throttle_if_app_is_waiting)
2476 return throttle;
2478 spin_lock_irq(&device->al_lock);
2479 tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2480 if (tmp) {
2481 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2482 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2483 throttle = false;
2484 /* Do not slow down if app IO is already waiting for this extent,
2485 * and our progress is necessary for application IO to complete. */
2487 spin_unlock_irq(&device->al_lock);
2489 return throttle;
2492 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2494 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2495 unsigned long db, dt, dbdt;
2496 unsigned int c_min_rate;
2497 int curr_events;
2499 rcu_read_lock();
2500 c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2501 rcu_read_unlock();
2503 /* feature disabled? */
2504 if (c_min_rate == 0)
2505 return false;
2507 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2508 (int)part_stat_read(&disk->part0, sectors[1]) -
2509 atomic_read(&device->rs_sect_ev);
2511 if (atomic_read(&device->ap_actlog_cnt)
2512 || curr_events - device->rs_last_events > 64) {
2513 unsigned long rs_left;
2514 int i;
2516 device->rs_last_events = curr_events;
2518 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2519 * approx. */
2520 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2522 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2523 rs_left = device->ov_left;
2524 else
2525 rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2527 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2528 if (!dt)
2529 dt++;
2530 db = device->rs_mark_left[i] - rs_left;
2531 dbdt = Bit2KB(db/dt);
2533 if (dbdt > c_min_rate)
2534 return true;
2536 return false;
2539 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2541 struct drbd_peer_device *peer_device;
2542 struct drbd_device *device;
2543 sector_t sector;
2544 sector_t capacity;
2545 struct drbd_peer_request *peer_req;
2546 struct digest_info *di = NULL;
2547 int size, verb;
2548 unsigned int fault_type;
2549 struct p_block_req *p = pi->data;
2551 peer_device = conn_peer_device(connection, pi->vnr);
2552 if (!peer_device)
2553 return -EIO;
2554 device = peer_device->device;
2555 capacity = drbd_get_capacity(device->this_bdev);
2557 sector = be64_to_cpu(p->sector);
2558 size = be32_to_cpu(p->blksize);
2560 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2561 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2562 (unsigned long long)sector, size);
2563 return -EINVAL;
2565 if (sector + (size>>9) > capacity) {
2566 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2567 (unsigned long long)sector, size);
2568 return -EINVAL;
2571 if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2572 verb = 1;
2573 switch (pi->cmd) {
2574 case P_DATA_REQUEST:
2575 drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2576 break;
2577 case P_RS_DATA_REQUEST:
2578 case P_CSUM_RS_REQUEST:
2579 case P_OV_REQUEST:
2580 drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2581 break;
2582 case P_OV_REPLY:
2583 verb = 0;
2584 dec_rs_pending(device);
2585 drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2586 break;
2587 default:
2588 BUG();
2590 if (verb && __ratelimit(&drbd_ratelimit_state))
2591 drbd_err(device, "Can not satisfy peer's read request, "
2592 "no local data.\n");
2594 /* drain possibly payload */
2595 return drbd_drain_block(peer_device, pi->size);
2598 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2599 * "criss-cross" setup, that might cause write-out on some other DRBD,
2600 * which in turn might block on the other node at this very place. */
2601 peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2602 true /* has real payload */, GFP_NOIO);
2603 if (!peer_req) {
2604 put_ldev(device);
2605 return -ENOMEM;
2608 switch (pi->cmd) {
2609 case P_DATA_REQUEST:
2610 peer_req->w.cb = w_e_end_data_req;
2611 fault_type = DRBD_FAULT_DT_RD;
2612 /* application IO, don't drbd_rs_begin_io */
2613 peer_req->flags |= EE_APPLICATION;
2614 goto submit;
2616 case P_RS_DATA_REQUEST:
2617 peer_req->w.cb = w_e_end_rsdata_req;
2618 fault_type = DRBD_FAULT_RS_RD;
2619 /* used in the sector offset progress display */
2620 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2621 break;
2623 case P_OV_REPLY:
2624 case P_CSUM_RS_REQUEST:
2625 fault_type = DRBD_FAULT_RS_RD;
2626 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2627 if (!di)
2628 goto out_free_e;
2630 di->digest_size = pi->size;
2631 di->digest = (((char *)di)+sizeof(struct digest_info));
2633 peer_req->digest = di;
2634 peer_req->flags |= EE_HAS_DIGEST;
2636 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2637 goto out_free_e;
2639 if (pi->cmd == P_CSUM_RS_REQUEST) {
2640 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2641 peer_req->w.cb = w_e_end_csum_rs_req;
2642 /* used in the sector offset progress display */
2643 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2644 /* remember to report stats in drbd_resync_finished */
2645 device->use_csums = true;
2646 } else if (pi->cmd == P_OV_REPLY) {
2647 /* track progress, we may need to throttle */
2648 atomic_add(size >> 9, &device->rs_sect_in);
2649 peer_req->w.cb = w_e_end_ov_reply;
2650 dec_rs_pending(device);
2651 /* drbd_rs_begin_io done when we sent this request,
2652 * but accounting still needs to be done. */
2653 goto submit_for_resync;
2655 break;
2657 case P_OV_REQUEST:
2658 if (device->ov_start_sector == ~(sector_t)0 &&
2659 peer_device->connection->agreed_pro_version >= 90) {
2660 unsigned long now = jiffies;
2661 int i;
2662 device->ov_start_sector = sector;
2663 device->ov_position = sector;
2664 device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2665 device->rs_total = device->ov_left;
2666 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2667 device->rs_mark_left[i] = device->ov_left;
2668 device->rs_mark_time[i] = now;
2670 drbd_info(device, "Online Verify start sector: %llu\n",
2671 (unsigned long long)sector);
2673 peer_req->w.cb = w_e_end_ov_req;
2674 fault_type = DRBD_FAULT_RS_RD;
2675 break;
2677 default:
2678 BUG();
2681 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2682 * wrt the receiver, but it is not as straightforward as it may seem.
2683 * Various places in the resync start and stop logic assume resync
2684 * requests are processed in order, requeuing this on the worker thread
2685 * introduces a bunch of new code for synchronization between threads.
2687 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2688 * "forever", throttling after drbd_rs_begin_io will lock that extent
2689 * for application writes for the same time. For now, just throttle
2690 * here, where the rest of the code expects the receiver to sleep for
2691 * a while, anyways.
2694 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2695 * this defers syncer requests for some time, before letting at least
2696 * on request through. The resync controller on the receiving side
2697 * will adapt to the incoming rate accordingly.
2699 * We cannot throttle here if remote is Primary/SyncTarget:
2700 * we would also throttle its application reads.
2701 * In that case, throttling is done on the SyncTarget only.
2704 /* Even though this may be a resync request, we do add to "read_ee";
2705 * "sync_ee" is only used for resync WRITEs.
2706 * Add to list early, so debugfs can find this request
2707 * even if we have to sleep below. */
2708 spin_lock_irq(&device->resource->req_lock);
2709 list_add_tail(&peer_req->w.list, &device->read_ee);
2710 spin_unlock_irq(&device->resource->req_lock);
2712 update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2713 if (device->state.peer != R_PRIMARY
2714 && drbd_rs_should_slow_down(device, sector, false))
2715 schedule_timeout_uninterruptible(HZ/10);
2716 update_receiver_timing_details(connection, drbd_rs_begin_io);
2717 if (drbd_rs_begin_io(device, sector))
2718 goto out_free_e;
2720 submit_for_resync:
2721 atomic_add(size >> 9, &device->rs_sect_ev);
2723 submit:
2724 update_receiver_timing_details(connection, drbd_submit_peer_request);
2725 inc_unacked(device);
2726 if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2727 return 0;
2729 /* don't care for the reason here */
2730 drbd_err(device, "submit failed, triggering re-connect\n");
2732 out_free_e:
2733 spin_lock_irq(&device->resource->req_lock);
2734 list_del(&peer_req->w.list);
2735 spin_unlock_irq(&device->resource->req_lock);
2736 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2738 put_ldev(device);
2739 drbd_free_peer_req(device, peer_req);
2740 return -EIO;
2744 * drbd_asb_recover_0p - Recover after split-brain with no remaining primaries
2746 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2748 struct drbd_device *device = peer_device->device;
2749 int self, peer, rv = -100;
2750 unsigned long ch_self, ch_peer;
2751 enum drbd_after_sb_p after_sb_0p;
2753 self = device->ldev->md.uuid[UI_BITMAP] & 1;
2754 peer = device->p_uuid[UI_BITMAP] & 1;
2756 ch_peer = device->p_uuid[UI_SIZE];
2757 ch_self = device->comm_bm_set;
2759 rcu_read_lock();
2760 after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2761 rcu_read_unlock();
2762 switch (after_sb_0p) {
2763 case ASB_CONSENSUS:
2764 case ASB_DISCARD_SECONDARY:
2765 case ASB_CALL_HELPER:
2766 case ASB_VIOLENTLY:
2767 drbd_err(device, "Configuration error.\n");
2768 break;
2769 case ASB_DISCONNECT:
2770 break;
2771 case ASB_DISCARD_YOUNGER_PRI:
2772 if (self == 0 && peer == 1) {
2773 rv = -1;
2774 break;
2776 if (self == 1 && peer == 0) {
2777 rv = 1;
2778 break;
2780 /* Else fall through to one of the other strategies... */
2781 case ASB_DISCARD_OLDER_PRI:
2782 if (self == 0 && peer == 1) {
2783 rv = 1;
2784 break;
2786 if (self == 1 && peer == 0) {
2787 rv = -1;
2788 break;
2790 /* Else fall through to one of the other strategies... */
2791 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2792 "Using discard-least-changes instead\n");
2793 case ASB_DISCARD_ZERO_CHG:
2794 if (ch_peer == 0 && ch_self == 0) {
2795 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2796 ? -1 : 1;
2797 break;
2798 } else {
2799 if (ch_peer == 0) { rv = 1; break; }
2800 if (ch_self == 0) { rv = -1; break; }
2802 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2803 break;
2804 case ASB_DISCARD_LEAST_CHG:
2805 if (ch_self < ch_peer)
2806 rv = -1;
2807 else if (ch_self > ch_peer)
2808 rv = 1;
2809 else /* ( ch_self == ch_peer ) */
2810 /* Well, then use something else. */
2811 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2812 ? -1 : 1;
2813 break;
2814 case ASB_DISCARD_LOCAL:
2815 rv = -1;
2816 break;
2817 case ASB_DISCARD_REMOTE:
2818 rv = 1;
2821 return rv;
2825 * drbd_asb_recover_1p - Recover after split-brain with one remaining primary
2827 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2829 struct drbd_device *device = peer_device->device;
2830 int hg, rv = -100;
2831 enum drbd_after_sb_p after_sb_1p;
2833 rcu_read_lock();
2834 after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2835 rcu_read_unlock();
2836 switch (after_sb_1p) {
2837 case ASB_DISCARD_YOUNGER_PRI:
2838 case ASB_DISCARD_OLDER_PRI:
2839 case ASB_DISCARD_LEAST_CHG:
2840 case ASB_DISCARD_LOCAL:
2841 case ASB_DISCARD_REMOTE:
2842 case ASB_DISCARD_ZERO_CHG:
2843 drbd_err(device, "Configuration error.\n");
2844 break;
2845 case ASB_DISCONNECT:
2846 break;
2847 case ASB_CONSENSUS:
2848 hg = drbd_asb_recover_0p(peer_device);
2849 if (hg == -1 && device->state.role == R_SECONDARY)
2850 rv = hg;
2851 if (hg == 1 && device->state.role == R_PRIMARY)
2852 rv = hg;
2853 break;
2854 case ASB_VIOLENTLY:
2855 rv = drbd_asb_recover_0p(peer_device);
2856 break;
2857 case ASB_DISCARD_SECONDARY:
2858 return device->state.role == R_PRIMARY ? 1 : -1;
2859 case ASB_CALL_HELPER:
2860 hg = drbd_asb_recover_0p(peer_device);
2861 if (hg == -1 && device->state.role == R_PRIMARY) {
2862 enum drbd_state_rv rv2;
2864 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2865 * we might be here in C_WF_REPORT_PARAMS which is transient.
2866 * we do not need to wait for the after state change work either. */
2867 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2868 if (rv2 != SS_SUCCESS) {
2869 drbd_khelper(device, "pri-lost-after-sb");
2870 } else {
2871 drbd_warn(device, "Successfully gave up primary role.\n");
2872 rv = hg;
2874 } else
2875 rv = hg;
2878 return rv;
2882 * drbd_asb_recover_2p - Recover after split-brain with two remaining primaries
2884 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2886 struct drbd_device *device = peer_device->device;
2887 int hg, rv = -100;
2888 enum drbd_after_sb_p after_sb_2p;
2890 rcu_read_lock();
2891 after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2892 rcu_read_unlock();
2893 switch (after_sb_2p) {
2894 case ASB_DISCARD_YOUNGER_PRI:
2895 case ASB_DISCARD_OLDER_PRI:
2896 case ASB_DISCARD_LEAST_CHG:
2897 case ASB_DISCARD_LOCAL:
2898 case ASB_DISCARD_REMOTE:
2899 case ASB_CONSENSUS:
2900 case ASB_DISCARD_SECONDARY:
2901 case ASB_DISCARD_ZERO_CHG:
2902 drbd_err(device, "Configuration error.\n");
2903 break;
2904 case ASB_VIOLENTLY:
2905 rv = drbd_asb_recover_0p(peer_device);
2906 break;
2907 case ASB_DISCONNECT:
2908 break;
2909 case ASB_CALL_HELPER:
2910 hg = drbd_asb_recover_0p(peer_device);
2911 if (hg == -1) {
2912 enum drbd_state_rv rv2;
2914 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2915 * we might be here in C_WF_REPORT_PARAMS which is transient.
2916 * we do not need to wait for the after state change work either. */
2917 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2918 if (rv2 != SS_SUCCESS) {
2919 drbd_khelper(device, "pri-lost-after-sb");
2920 } else {
2921 drbd_warn(device, "Successfully gave up primary role.\n");
2922 rv = hg;
2924 } else
2925 rv = hg;
2928 return rv;
2931 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2932 u64 bits, u64 flags)
2934 if (!uuid) {
2935 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2936 return;
2938 drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2939 text,
2940 (unsigned long long)uuid[UI_CURRENT],
2941 (unsigned long long)uuid[UI_BITMAP],
2942 (unsigned long long)uuid[UI_HISTORY_START],
2943 (unsigned long long)uuid[UI_HISTORY_END],
2944 (unsigned long long)bits,
2945 (unsigned long long)flags);
2949 100 after split brain try auto recover
2950 2 C_SYNC_SOURCE set BitMap
2951 1 C_SYNC_SOURCE use BitMap
2952 0 no Sync
2953 -1 C_SYNC_TARGET use BitMap
2954 -2 C_SYNC_TARGET set BitMap
2955 -100 after split brain, disconnect
2956 -1000 unrelated data
2957 -1091 requires proto 91
2958 -1096 requires proto 96
2960 static int drbd_uuid_compare(struct drbd_device *const device, int *rule_nr) __must_hold(local)
2962 struct drbd_peer_device *const peer_device = first_peer_device(device);
2963 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
2964 u64 self, peer;
2965 int i, j;
2967 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2968 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2970 *rule_nr = 10;
2971 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2972 return 0;
2974 *rule_nr = 20;
2975 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2976 peer != UUID_JUST_CREATED)
2977 return -2;
2979 *rule_nr = 30;
2980 if (self != UUID_JUST_CREATED &&
2981 (peer == UUID_JUST_CREATED || peer == (u64)0))
2982 return 2;
2984 if (self == peer) {
2985 int rct, dc; /* roles at crash time */
2987 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2989 if (connection->agreed_pro_version < 91)
2990 return -1091;
2992 if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2993 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2994 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2995 drbd_uuid_move_history(device);
2996 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2997 device->ldev->md.uuid[UI_BITMAP] = 0;
2999 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3000 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3001 *rule_nr = 34;
3002 } else {
3003 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3004 *rule_nr = 36;
3007 return 1;
3010 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3012 if (connection->agreed_pro_version < 91)
3013 return -1091;
3015 if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3016 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3017 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3019 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3020 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3021 device->p_uuid[UI_BITMAP] = 0UL;
3023 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3024 *rule_nr = 35;
3025 } else {
3026 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3027 *rule_nr = 37;
3030 return -1;
3033 /* Common power [off|failure] */
3034 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3035 (device->p_uuid[UI_FLAGS] & 2);
3036 /* lowest bit is set when we were primary,
3037 * next bit (weight 2) is set when peer was primary */
3038 *rule_nr = 40;
3040 switch (rct) {
3041 case 0: /* !self_pri && !peer_pri */ return 0;
3042 case 1: /* self_pri && !peer_pri */ return 1;
3043 case 2: /* !self_pri && peer_pri */ return -1;
3044 case 3: /* self_pri && peer_pri */
3045 dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3046 return dc ? -1 : 1;
3050 *rule_nr = 50;
3051 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3052 if (self == peer)
3053 return -1;
3055 *rule_nr = 51;
3056 peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3057 if (self == peer) {
3058 if (connection->agreed_pro_version < 96 ?
3059 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3060 (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3061 peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3062 /* The last P_SYNC_UUID did not get though. Undo the last start of
3063 resync as sync source modifications of the peer's UUIDs. */
3065 if (connection->agreed_pro_version < 91)
3066 return -1091;
3068 device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3069 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3071 drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3072 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3074 return -1;
3078 *rule_nr = 60;
3079 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3080 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3081 peer = device->p_uuid[i] & ~((u64)1);
3082 if (self == peer)
3083 return -2;
3086 *rule_nr = 70;
3087 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3088 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3089 if (self == peer)
3090 return 1;
3092 *rule_nr = 71;
3093 self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3094 if (self == peer) {
3095 if (connection->agreed_pro_version < 96 ?
3096 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3097 (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3098 self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3099 /* The last P_SYNC_UUID did not get though. Undo the last start of
3100 resync as sync source modifications of our UUIDs. */
3102 if (connection->agreed_pro_version < 91)
3103 return -1091;
3105 __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3106 __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3108 drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3109 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3110 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3112 return 1;
3117 *rule_nr = 80;
3118 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3119 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3120 self = device->ldev->md.uuid[i] & ~((u64)1);
3121 if (self == peer)
3122 return 2;
3125 *rule_nr = 90;
3126 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3127 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3128 if (self == peer && self != ((u64)0))
3129 return 100;
3131 *rule_nr = 100;
3132 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3133 self = device->ldev->md.uuid[i] & ~((u64)1);
3134 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3135 peer = device->p_uuid[j] & ~((u64)1);
3136 if (self == peer)
3137 return -100;
3141 return -1000;
3144 /* drbd_sync_handshake() returns the new conn state on success, or
3145 CONN_MASK (-1) on failure.
3147 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3148 enum drbd_role peer_role,
3149 enum drbd_disk_state peer_disk) __must_hold(local)
3151 struct drbd_device *device = peer_device->device;
3152 enum drbd_conns rv = C_MASK;
3153 enum drbd_disk_state mydisk;
3154 struct net_conf *nc;
3155 int hg, rule_nr, rr_conflict, tentative;
3157 mydisk = device->state.disk;
3158 if (mydisk == D_NEGOTIATING)
3159 mydisk = device->new_state_tmp.disk;
3161 drbd_info(device, "drbd_sync_handshake:\n");
3163 spin_lock_irq(&device->ldev->md.uuid_lock);
3164 drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3165 drbd_uuid_dump(device, "peer", device->p_uuid,
3166 device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3168 hg = drbd_uuid_compare(device, &rule_nr);
3169 spin_unlock_irq(&device->ldev->md.uuid_lock);
3171 drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3173 if (hg == -1000) {
3174 drbd_alert(device, "Unrelated data, aborting!\n");
3175 return C_MASK;
3177 if (hg < -1000) {
3178 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3179 return C_MASK;
3182 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3183 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
3184 int f = (hg == -100) || abs(hg) == 2;
3185 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3186 if (f)
3187 hg = hg*2;
3188 drbd_info(device, "Becoming sync %s due to disk states.\n",
3189 hg > 0 ? "source" : "target");
3192 if (abs(hg) == 100)
3193 drbd_khelper(device, "initial-split-brain");
3195 rcu_read_lock();
3196 nc = rcu_dereference(peer_device->connection->net_conf);
3198 if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3199 int pcount = (device->state.role == R_PRIMARY)
3200 + (peer_role == R_PRIMARY);
3201 int forced = (hg == -100);
3203 switch (pcount) {
3204 case 0:
3205 hg = drbd_asb_recover_0p(peer_device);
3206 break;
3207 case 1:
3208 hg = drbd_asb_recover_1p(peer_device);
3209 break;
3210 case 2:
3211 hg = drbd_asb_recover_2p(peer_device);
3212 break;
3214 if (abs(hg) < 100) {
3215 drbd_warn(device, "Split-Brain detected, %d primaries, "
3216 "automatically solved. Sync from %s node\n",
3217 pcount, (hg < 0) ? "peer" : "this");
3218 if (forced) {
3219 drbd_warn(device, "Doing a full sync, since"
3220 " UUIDs where ambiguous.\n");
3221 hg = hg*2;
3226 if (hg == -100) {
3227 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3228 hg = -1;
3229 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3230 hg = 1;
3232 if (abs(hg) < 100)
3233 drbd_warn(device, "Split-Brain detected, manually solved. "
3234 "Sync from %s node\n",
3235 (hg < 0) ? "peer" : "this");
3237 rr_conflict = nc->rr_conflict;
3238 tentative = nc->tentative;
3239 rcu_read_unlock();
3241 if (hg == -100) {
3242 /* FIXME this log message is not correct if we end up here
3243 * after an attempted attach on a diskless node.
3244 * We just refuse to attach -- well, we drop the "connection"
3245 * to that disk, in a way... */
3246 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3247 drbd_khelper(device, "split-brain");
3248 return C_MASK;
3251 if (hg > 0 && mydisk <= D_INCONSISTENT) {
3252 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3253 return C_MASK;
3256 if (hg < 0 && /* by intention we do not use mydisk here. */
3257 device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3258 switch (rr_conflict) {
3259 case ASB_CALL_HELPER:
3260 drbd_khelper(device, "pri-lost");
3261 /* fall through */
3262 case ASB_DISCONNECT:
3263 drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3264 return C_MASK;
3265 case ASB_VIOLENTLY:
3266 drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3267 "assumption\n");
3271 if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3272 if (hg == 0)
3273 drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3274 else
3275 drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3276 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3277 abs(hg) >= 2 ? "full" : "bit-map based");
3278 return C_MASK;
3281 if (abs(hg) >= 2) {
3282 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3283 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3284 BM_LOCKED_SET_ALLOWED))
3285 return C_MASK;
3288 if (hg > 0) { /* become sync source. */
3289 rv = C_WF_BITMAP_S;
3290 } else if (hg < 0) { /* become sync target */
3291 rv = C_WF_BITMAP_T;
3292 } else {
3293 rv = C_CONNECTED;
3294 if (drbd_bm_total_weight(device)) {
3295 drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3296 drbd_bm_total_weight(device));
3300 return rv;
3303 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3305 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3306 if (peer == ASB_DISCARD_REMOTE)
3307 return ASB_DISCARD_LOCAL;
3309 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3310 if (peer == ASB_DISCARD_LOCAL)
3311 return ASB_DISCARD_REMOTE;
3313 /* everything else is valid if they are equal on both sides. */
3314 return peer;
3317 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3319 struct p_protocol *p = pi->data;
3320 enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3321 int p_proto, p_discard_my_data, p_two_primaries, cf;
3322 struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3323 char integrity_alg[SHARED_SECRET_MAX] = "";
3324 struct crypto_ahash *peer_integrity_tfm = NULL;
3325 void *int_dig_in = NULL, *int_dig_vv = NULL;
3327 p_proto = be32_to_cpu(p->protocol);
3328 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
3329 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
3330 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
3331 p_two_primaries = be32_to_cpu(p->two_primaries);
3332 cf = be32_to_cpu(p->conn_flags);
3333 p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3335 if (connection->agreed_pro_version >= 87) {
3336 int err;
3338 if (pi->size > sizeof(integrity_alg))
3339 return -EIO;
3340 err = drbd_recv_all(connection, integrity_alg, pi->size);
3341 if (err)
3342 return err;
3343 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3346 if (pi->cmd != P_PROTOCOL_UPDATE) {
3347 clear_bit(CONN_DRY_RUN, &connection->flags);
3349 if (cf & CF_DRY_RUN)
3350 set_bit(CONN_DRY_RUN, &connection->flags);
3352 rcu_read_lock();
3353 nc = rcu_dereference(connection->net_conf);
3355 if (p_proto != nc->wire_protocol) {
3356 drbd_err(connection, "incompatible %s settings\n", "protocol");
3357 goto disconnect_rcu_unlock;
3360 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3361 drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3362 goto disconnect_rcu_unlock;
3365 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3366 drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3367 goto disconnect_rcu_unlock;
3370 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3371 drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3372 goto disconnect_rcu_unlock;
3375 if (p_discard_my_data && nc->discard_my_data) {
3376 drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3377 goto disconnect_rcu_unlock;
3380 if (p_two_primaries != nc->two_primaries) {
3381 drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3382 goto disconnect_rcu_unlock;
3385 if (strcmp(integrity_alg, nc->integrity_alg)) {
3386 drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3387 goto disconnect_rcu_unlock;
3390 rcu_read_unlock();
3393 if (integrity_alg[0]) {
3394 int hash_size;
3397 * We can only change the peer data integrity algorithm
3398 * here. Changing our own data integrity algorithm
3399 * requires that we send a P_PROTOCOL_UPDATE packet at
3400 * the same time; otherwise, the peer has no way to
3401 * tell between which packets the algorithm should
3402 * change.
3405 peer_integrity_tfm = crypto_alloc_ahash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3406 if (!peer_integrity_tfm) {
3407 drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3408 integrity_alg);
3409 goto disconnect;
3412 hash_size = crypto_ahash_digestsize(peer_integrity_tfm);
3413 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3414 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3415 if (!(int_dig_in && int_dig_vv)) {
3416 drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3417 goto disconnect;
3421 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3422 if (!new_net_conf) {
3423 drbd_err(connection, "Allocation of new net_conf failed\n");
3424 goto disconnect;
3427 mutex_lock(&connection->data.mutex);
3428 mutex_lock(&connection->resource->conf_update);
3429 old_net_conf = connection->net_conf;
3430 *new_net_conf = *old_net_conf;
3432 new_net_conf->wire_protocol = p_proto;
3433 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3434 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3435 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3436 new_net_conf->two_primaries = p_two_primaries;
3438 rcu_assign_pointer(connection->net_conf, new_net_conf);
3439 mutex_unlock(&connection->resource->conf_update);
3440 mutex_unlock(&connection->data.mutex);
3442 crypto_free_ahash(connection->peer_integrity_tfm);
3443 kfree(connection->int_dig_in);
3444 kfree(connection->int_dig_vv);
3445 connection->peer_integrity_tfm = peer_integrity_tfm;
3446 connection->int_dig_in = int_dig_in;
3447 connection->int_dig_vv = int_dig_vv;
3449 if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3450 drbd_info(connection, "peer data-integrity-alg: %s\n",
3451 integrity_alg[0] ? integrity_alg : "(none)");
3453 synchronize_rcu();
3454 kfree(old_net_conf);
3455 return 0;
3457 disconnect_rcu_unlock:
3458 rcu_read_unlock();
3459 disconnect:
3460 crypto_free_ahash(peer_integrity_tfm);
3461 kfree(int_dig_in);
3462 kfree(int_dig_vv);
3463 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3464 return -EIO;
3467 /* helper function
3468 * input: alg name, feature name
3469 * return: NULL (alg name was "")
3470 * ERR_PTR(error) if something goes wrong
3471 * or the crypto hash ptr, if it worked out ok. */
3472 static struct crypto_ahash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3473 const char *alg, const char *name)
3475 struct crypto_ahash *tfm;
3477 if (!alg[0])
3478 return NULL;
3480 tfm = crypto_alloc_ahash(alg, 0, CRYPTO_ALG_ASYNC);
3481 if (IS_ERR(tfm)) {
3482 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3483 alg, name, PTR_ERR(tfm));
3484 return tfm;
3486 return tfm;
3489 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3491 void *buffer = connection->data.rbuf;
3492 int size = pi->size;
3494 while (size) {
3495 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3496 s = drbd_recv(connection, buffer, s);
3497 if (s <= 0) {
3498 if (s < 0)
3499 return s;
3500 break;
3502 size -= s;
3504 if (size)
3505 return -EIO;
3506 return 0;
3510 * config_unknown_volume - device configuration command for unknown volume
3512 * When a device is added to an existing connection, the node on which the
3513 * device is added first will send configuration commands to its peer but the
3514 * peer will not know about the device yet. It will warn and ignore these
3515 * commands. Once the device is added on the second node, the second node will
3516 * send the same device configuration commands, but in the other direction.
3518 * (We can also end up here if drbd is misconfigured.)
3520 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3522 drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3523 cmdname(pi->cmd), pi->vnr);
3524 return ignore_remaining_packet(connection, pi);
3527 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3529 struct drbd_peer_device *peer_device;
3530 struct drbd_device *device;
3531 struct p_rs_param_95 *p;
3532 unsigned int header_size, data_size, exp_max_sz;
3533 struct crypto_ahash *verify_tfm = NULL;
3534 struct crypto_ahash *csums_tfm = NULL;
3535 struct net_conf *old_net_conf, *new_net_conf = NULL;
3536 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3537 const int apv = connection->agreed_pro_version;
3538 struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3539 int fifo_size = 0;
3540 int err;
3542 peer_device = conn_peer_device(connection, pi->vnr);
3543 if (!peer_device)
3544 return config_unknown_volume(connection, pi);
3545 device = peer_device->device;
3547 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3548 : apv == 88 ? sizeof(struct p_rs_param)
3549 + SHARED_SECRET_MAX
3550 : apv <= 94 ? sizeof(struct p_rs_param_89)
3551 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3553 if (pi->size > exp_max_sz) {
3554 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3555 pi->size, exp_max_sz);
3556 return -EIO;
3559 if (apv <= 88) {
3560 header_size = sizeof(struct p_rs_param);
3561 data_size = pi->size - header_size;
3562 } else if (apv <= 94) {
3563 header_size = sizeof(struct p_rs_param_89);
3564 data_size = pi->size - header_size;
3565 D_ASSERT(device, data_size == 0);
3566 } else {
3567 header_size = sizeof(struct p_rs_param_95);
3568 data_size = pi->size - header_size;
3569 D_ASSERT(device, data_size == 0);
3572 /* initialize verify_alg and csums_alg */
3573 p = pi->data;
3574 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3576 err = drbd_recv_all(peer_device->connection, p, header_size);
3577 if (err)
3578 return err;
3580 mutex_lock(&connection->resource->conf_update);
3581 old_net_conf = peer_device->connection->net_conf;
3582 if (get_ldev(device)) {
3583 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3584 if (!new_disk_conf) {
3585 put_ldev(device);
3586 mutex_unlock(&connection->resource->conf_update);
3587 drbd_err(device, "Allocation of new disk_conf failed\n");
3588 return -ENOMEM;
3591 old_disk_conf = device->ldev->disk_conf;
3592 *new_disk_conf = *old_disk_conf;
3594 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3597 if (apv >= 88) {
3598 if (apv == 88) {
3599 if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3600 drbd_err(device, "verify-alg of wrong size, "
3601 "peer wants %u, accepting only up to %u byte\n",
3602 data_size, SHARED_SECRET_MAX);
3603 err = -EIO;
3604 goto reconnect;
3607 err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3608 if (err)
3609 goto reconnect;
3610 /* we expect NUL terminated string */
3611 /* but just in case someone tries to be evil */
3612 D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3613 p->verify_alg[data_size-1] = 0;
3615 } else /* apv >= 89 */ {
3616 /* we still expect NUL terminated strings */
3617 /* but just in case someone tries to be evil */
3618 D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3619 D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3620 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3621 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3624 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3625 if (device->state.conn == C_WF_REPORT_PARAMS) {
3626 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3627 old_net_conf->verify_alg, p->verify_alg);
3628 goto disconnect;
3630 verify_tfm = drbd_crypto_alloc_digest_safe(device,
3631 p->verify_alg, "verify-alg");
3632 if (IS_ERR(verify_tfm)) {
3633 verify_tfm = NULL;
3634 goto disconnect;
3638 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3639 if (device->state.conn == C_WF_REPORT_PARAMS) {
3640 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3641 old_net_conf->csums_alg, p->csums_alg);
3642 goto disconnect;
3644 csums_tfm = drbd_crypto_alloc_digest_safe(device,
3645 p->csums_alg, "csums-alg");
3646 if (IS_ERR(csums_tfm)) {
3647 csums_tfm = NULL;
3648 goto disconnect;
3652 if (apv > 94 && new_disk_conf) {
3653 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3654 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3655 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3656 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3658 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3659 if (fifo_size != device->rs_plan_s->size) {
3660 new_plan = fifo_alloc(fifo_size);
3661 if (!new_plan) {
3662 drbd_err(device, "kmalloc of fifo_buffer failed");
3663 put_ldev(device);
3664 goto disconnect;
3669 if (verify_tfm || csums_tfm) {
3670 new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3671 if (!new_net_conf) {
3672 drbd_err(device, "Allocation of new net_conf failed\n");
3673 goto disconnect;
3676 *new_net_conf = *old_net_conf;
3678 if (verify_tfm) {
3679 strcpy(new_net_conf->verify_alg, p->verify_alg);
3680 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3681 crypto_free_ahash(peer_device->connection->verify_tfm);
3682 peer_device->connection->verify_tfm = verify_tfm;
3683 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3685 if (csums_tfm) {
3686 strcpy(new_net_conf->csums_alg, p->csums_alg);
3687 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3688 crypto_free_ahash(peer_device->connection->csums_tfm);
3689 peer_device->connection->csums_tfm = csums_tfm;
3690 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3692 rcu_assign_pointer(connection->net_conf, new_net_conf);
3696 if (new_disk_conf) {
3697 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3698 put_ldev(device);
3701 if (new_plan) {
3702 old_plan = device->rs_plan_s;
3703 rcu_assign_pointer(device->rs_plan_s, new_plan);
3706 mutex_unlock(&connection->resource->conf_update);
3707 synchronize_rcu();
3708 if (new_net_conf)
3709 kfree(old_net_conf);
3710 kfree(old_disk_conf);
3711 kfree(old_plan);
3713 return 0;
3715 reconnect:
3716 if (new_disk_conf) {
3717 put_ldev(device);
3718 kfree(new_disk_conf);
3720 mutex_unlock(&connection->resource->conf_update);
3721 return -EIO;
3723 disconnect:
3724 kfree(new_plan);
3725 if (new_disk_conf) {
3726 put_ldev(device);
3727 kfree(new_disk_conf);
3729 mutex_unlock(&connection->resource->conf_update);
3730 /* just for completeness: actually not needed,
3731 * as this is not reached if csums_tfm was ok. */
3732 crypto_free_ahash(csums_tfm);
3733 /* but free the verify_tfm again, if csums_tfm did not work out */
3734 crypto_free_ahash(verify_tfm);
3735 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3736 return -EIO;
3739 /* warn if the arguments differ by more than 12.5% */
3740 static void warn_if_differ_considerably(struct drbd_device *device,
3741 const char *s, sector_t a, sector_t b)
3743 sector_t d;
3744 if (a == 0 || b == 0)
3745 return;
3746 d = (a > b) ? (a - b) : (b - a);
3747 if (d > (a>>3) || d > (b>>3))
3748 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3749 (unsigned long long)a, (unsigned long long)b);
3752 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3754 struct drbd_peer_device *peer_device;
3755 struct drbd_device *device;
3756 struct p_sizes *p = pi->data;
3757 enum determine_dev_size dd = DS_UNCHANGED;
3758 sector_t p_size, p_usize, p_csize, my_usize;
3759 int ldsc = 0; /* local disk size changed */
3760 enum dds_flags ddsf;
3762 peer_device = conn_peer_device(connection, pi->vnr);
3763 if (!peer_device)
3764 return config_unknown_volume(connection, pi);
3765 device = peer_device->device;
3767 p_size = be64_to_cpu(p->d_size);
3768 p_usize = be64_to_cpu(p->u_size);
3769 p_csize = be64_to_cpu(p->c_size);
3771 /* just store the peer's disk size for now.
3772 * we still need to figure out whether we accept that. */
3773 device->p_size = p_size;
3775 if (get_ldev(device)) {
3776 rcu_read_lock();
3777 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3778 rcu_read_unlock();
3780 warn_if_differ_considerably(device, "lower level device sizes",
3781 p_size, drbd_get_max_capacity(device->ldev));
3782 warn_if_differ_considerably(device, "user requested size",
3783 p_usize, my_usize);
3785 /* if this is the first connect, or an otherwise expected
3786 * param exchange, choose the minimum */
3787 if (device->state.conn == C_WF_REPORT_PARAMS)
3788 p_usize = min_not_zero(my_usize, p_usize);
3790 /* Never shrink a device with usable data during connect.
3791 But allow online shrinking if we are connected. */
3792 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3793 drbd_get_capacity(device->this_bdev) &&
3794 device->state.disk >= D_OUTDATED &&
3795 device->state.conn < C_CONNECTED) {
3796 drbd_err(device, "The peer's disk size is too small!\n");
3797 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3798 put_ldev(device);
3799 return -EIO;
3802 if (my_usize != p_usize) {
3803 struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3805 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3806 if (!new_disk_conf) {
3807 drbd_err(device, "Allocation of new disk_conf failed\n");
3808 put_ldev(device);
3809 return -ENOMEM;
3812 mutex_lock(&connection->resource->conf_update);
3813 old_disk_conf = device->ldev->disk_conf;
3814 *new_disk_conf = *old_disk_conf;
3815 new_disk_conf->disk_size = p_usize;
3817 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3818 mutex_unlock(&connection->resource->conf_update);
3819 synchronize_rcu();
3820 kfree(old_disk_conf);
3822 drbd_info(device, "Peer sets u_size to %lu sectors\n",
3823 (unsigned long)my_usize);
3826 put_ldev(device);
3829 device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3830 /* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
3831 In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3832 drbd_reconsider_max_bio_size(), we can be sure that after
3833 drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3835 ddsf = be16_to_cpu(p->dds_flags);
3836 if (get_ldev(device)) {
3837 drbd_reconsider_max_bio_size(device, device->ldev);
3838 dd = drbd_determine_dev_size(device, ddsf, NULL);
3839 put_ldev(device);
3840 if (dd == DS_ERROR)
3841 return -EIO;
3842 drbd_md_sync(device);
3843 } else {
3845 * I am diskless, need to accept the peer's *current* size.
3846 * I must NOT accept the peers backing disk size,
3847 * it may have been larger than mine all along...
3849 * At this point, the peer knows more about my disk, or at
3850 * least about what we last agreed upon, than myself.
3851 * So if his c_size is less than his d_size, the most likely
3852 * reason is that *my* d_size was smaller last time we checked.
3854 * However, if he sends a zero current size,
3855 * take his (user-capped or) backing disk size anyways.
3857 drbd_reconsider_max_bio_size(device, NULL);
3858 drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
3861 if (get_ldev(device)) {
3862 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3863 device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3864 ldsc = 1;
3867 put_ldev(device);
3870 if (device->state.conn > C_WF_REPORT_PARAMS) {
3871 if (be64_to_cpu(p->c_size) !=
3872 drbd_get_capacity(device->this_bdev) || ldsc) {
3873 /* we have different sizes, probably peer
3874 * needs to know my new size... */
3875 drbd_send_sizes(peer_device, 0, ddsf);
3877 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3878 (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3879 if (device->state.pdsk >= D_INCONSISTENT &&
3880 device->state.disk >= D_INCONSISTENT) {
3881 if (ddsf & DDSF_NO_RESYNC)
3882 drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3883 else
3884 resync_after_online_grow(device);
3885 } else
3886 set_bit(RESYNC_AFTER_NEG, &device->flags);
3890 return 0;
3893 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3895 struct drbd_peer_device *peer_device;
3896 struct drbd_device *device;
3897 struct p_uuids *p = pi->data;
3898 u64 *p_uuid;
3899 int i, updated_uuids = 0;
3901 peer_device = conn_peer_device(connection, pi->vnr);
3902 if (!peer_device)
3903 return config_unknown_volume(connection, pi);
3904 device = peer_device->device;
3906 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3907 if (!p_uuid) {
3908 drbd_err(device, "kmalloc of p_uuid failed\n");
3909 return false;
3912 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3913 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3915 kfree(device->p_uuid);
3916 device->p_uuid = p_uuid;
3918 if (device->state.conn < C_CONNECTED &&
3919 device->state.disk < D_INCONSISTENT &&
3920 device->state.role == R_PRIMARY &&
3921 (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3922 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3923 (unsigned long long)device->ed_uuid);
3924 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3925 return -EIO;
3928 if (get_ldev(device)) {
3929 int skip_initial_sync =
3930 device->state.conn == C_CONNECTED &&
3931 peer_device->connection->agreed_pro_version >= 90 &&
3932 device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3933 (p_uuid[UI_FLAGS] & 8);
3934 if (skip_initial_sync) {
3935 drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3936 drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3937 "clear_n_write from receive_uuids",
3938 BM_LOCKED_TEST_ALLOWED);
3939 _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3940 _drbd_uuid_set(device, UI_BITMAP, 0);
3941 _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3942 CS_VERBOSE, NULL);
3943 drbd_md_sync(device);
3944 updated_uuids = 1;
3946 put_ldev(device);
3947 } else if (device->state.disk < D_INCONSISTENT &&
3948 device->state.role == R_PRIMARY) {
3949 /* I am a diskless primary, the peer just created a new current UUID
3950 for me. */
3951 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3954 /* Before we test for the disk state, we should wait until an eventually
3955 ongoing cluster wide state change is finished. That is important if
3956 we are primary and are detaching from our disk. We need to see the
3957 new disk state... */
3958 mutex_lock(device->state_mutex);
3959 mutex_unlock(device->state_mutex);
3960 if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3961 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3963 if (updated_uuids)
3964 drbd_print_uuids(device, "receiver updated UUIDs to");
3966 return 0;
3970 * convert_state() - Converts the peer's view of the cluster state to our point of view
3971 * @ps: The state as seen by the peer.
3973 static union drbd_state convert_state(union drbd_state ps)
3975 union drbd_state ms;
3977 static enum drbd_conns c_tab[] = {
3978 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3979 [C_CONNECTED] = C_CONNECTED,
3981 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3982 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3983 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3984 [C_VERIFY_S] = C_VERIFY_T,
3985 [C_MASK] = C_MASK,
3988 ms.i = ps.i;
3990 ms.conn = c_tab[ps.conn];
3991 ms.peer = ps.role;
3992 ms.role = ps.peer;
3993 ms.pdsk = ps.disk;
3994 ms.disk = ps.pdsk;
3995 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3997 return ms;
4000 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4002 struct drbd_peer_device *peer_device;
4003 struct drbd_device *device;
4004 struct p_req_state *p = pi->data;
4005 union drbd_state mask, val;
4006 enum drbd_state_rv rv;
4008 peer_device = conn_peer_device(connection, pi->vnr);
4009 if (!peer_device)
4010 return -EIO;
4011 device = peer_device->device;
4013 mask.i = be32_to_cpu(p->mask);
4014 val.i = be32_to_cpu(p->val);
4016 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4017 mutex_is_locked(device->state_mutex)) {
4018 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4019 return 0;
4022 mask = convert_state(mask);
4023 val = convert_state(val);
4025 rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4026 drbd_send_sr_reply(peer_device, rv);
4028 drbd_md_sync(device);
4030 return 0;
4033 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4035 struct p_req_state *p = pi->data;
4036 union drbd_state mask, val;
4037 enum drbd_state_rv rv;
4039 mask.i = be32_to_cpu(p->mask);
4040 val.i = be32_to_cpu(p->val);
4042 if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4043 mutex_is_locked(&connection->cstate_mutex)) {
4044 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4045 return 0;
4048 mask = convert_state(mask);
4049 val = convert_state(val);
4051 rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4052 conn_send_sr_reply(connection, rv);
4054 return 0;
4057 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4059 struct drbd_peer_device *peer_device;
4060 struct drbd_device *device;
4061 struct p_state *p = pi->data;
4062 union drbd_state os, ns, peer_state;
4063 enum drbd_disk_state real_peer_disk;
4064 enum chg_state_flags cs_flags;
4065 int rv;
4067 peer_device = conn_peer_device(connection, pi->vnr);
4068 if (!peer_device)
4069 return config_unknown_volume(connection, pi);
4070 device = peer_device->device;
4072 peer_state.i = be32_to_cpu(p->state);
4074 real_peer_disk = peer_state.disk;
4075 if (peer_state.disk == D_NEGOTIATING) {
4076 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4077 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4080 spin_lock_irq(&device->resource->req_lock);
4081 retry:
4082 os = ns = drbd_read_state(device);
4083 spin_unlock_irq(&device->resource->req_lock);
4085 /* If some other part of the code (ack_receiver thread, timeout)
4086 * already decided to close the connection again,
4087 * we must not "re-establish" it here. */
4088 if (os.conn <= C_TEAR_DOWN)
4089 return -ECONNRESET;
4091 /* If this is the "end of sync" confirmation, usually the peer disk
4092 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4093 * set) resync started in PausedSyncT, or if the timing of pause-/
4094 * unpause-sync events has been "just right", the peer disk may
4095 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4097 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4098 real_peer_disk == D_UP_TO_DATE &&
4099 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4100 /* If we are (becoming) SyncSource, but peer is still in sync
4101 * preparation, ignore its uptodate-ness to avoid flapping, it
4102 * will change to inconsistent once the peer reaches active
4103 * syncing states.
4104 * It may have changed syncer-paused flags, however, so we
4105 * cannot ignore this completely. */
4106 if (peer_state.conn > C_CONNECTED &&
4107 peer_state.conn < C_SYNC_SOURCE)
4108 real_peer_disk = D_INCONSISTENT;
4110 /* if peer_state changes to connected at the same time,
4111 * it explicitly notifies us that it finished resync.
4112 * Maybe we should finish it up, too? */
4113 else if (os.conn >= C_SYNC_SOURCE &&
4114 peer_state.conn == C_CONNECTED) {
4115 if (drbd_bm_total_weight(device) <= device->rs_failed)
4116 drbd_resync_finished(device);
4117 return 0;
4121 /* explicit verify finished notification, stop sector reached. */
4122 if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4123 peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4124 ov_out_of_sync_print(device);
4125 drbd_resync_finished(device);
4126 return 0;
4129 /* peer says his disk is inconsistent, while we think it is uptodate,
4130 * and this happens while the peer still thinks we have a sync going on,
4131 * but we think we are already done with the sync.
4132 * We ignore this to avoid flapping pdsk.
4133 * This should not happen, if the peer is a recent version of drbd. */
4134 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4135 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4136 real_peer_disk = D_UP_TO_DATE;
4138 if (ns.conn == C_WF_REPORT_PARAMS)
4139 ns.conn = C_CONNECTED;
4141 if (peer_state.conn == C_AHEAD)
4142 ns.conn = C_BEHIND;
4144 if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4145 get_ldev_if_state(device, D_NEGOTIATING)) {
4146 int cr; /* consider resync */
4148 /* if we established a new connection */
4149 cr = (os.conn < C_CONNECTED);
4150 /* if we had an established connection
4151 * and one of the nodes newly attaches a disk */
4152 cr |= (os.conn == C_CONNECTED &&
4153 (peer_state.disk == D_NEGOTIATING ||
4154 os.disk == D_NEGOTIATING));
4155 /* if we have both been inconsistent, and the peer has been
4156 * forced to be UpToDate with --overwrite-data */
4157 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4158 /* if we had been plain connected, and the admin requested to
4159 * start a sync by "invalidate" or "invalidate-remote" */
4160 cr |= (os.conn == C_CONNECTED &&
4161 (peer_state.conn >= C_STARTING_SYNC_S &&
4162 peer_state.conn <= C_WF_BITMAP_T));
4164 if (cr)
4165 ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4167 put_ldev(device);
4168 if (ns.conn == C_MASK) {
4169 ns.conn = C_CONNECTED;
4170 if (device->state.disk == D_NEGOTIATING) {
4171 drbd_force_state(device, NS(disk, D_FAILED));
4172 } else if (peer_state.disk == D_NEGOTIATING) {
4173 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4174 peer_state.disk = D_DISKLESS;
4175 real_peer_disk = D_DISKLESS;
4176 } else {
4177 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4178 return -EIO;
4179 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4180 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4181 return -EIO;
4186 spin_lock_irq(&device->resource->req_lock);
4187 if (os.i != drbd_read_state(device).i)
4188 goto retry;
4189 clear_bit(CONSIDER_RESYNC, &device->flags);
4190 ns.peer = peer_state.role;
4191 ns.pdsk = real_peer_disk;
4192 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4193 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4194 ns.disk = device->new_state_tmp.disk;
4195 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4196 if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4197 test_bit(NEW_CUR_UUID, &device->flags)) {
4198 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4199 for temporal network outages! */
4200 spin_unlock_irq(&device->resource->req_lock);
4201 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4202 tl_clear(peer_device->connection);
4203 drbd_uuid_new_current(device);
4204 clear_bit(NEW_CUR_UUID, &device->flags);
4205 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4206 return -EIO;
4208 rv = _drbd_set_state(device, ns, cs_flags, NULL);
4209 ns = drbd_read_state(device);
4210 spin_unlock_irq(&device->resource->req_lock);
4212 if (rv < SS_SUCCESS) {
4213 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4214 return -EIO;
4217 if (os.conn > C_WF_REPORT_PARAMS) {
4218 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4219 peer_state.disk != D_NEGOTIATING ) {
4220 /* we want resync, peer has not yet decided to sync... */
4221 /* Nowadays only used when forcing a node into primary role and
4222 setting its disk to UpToDate with that */
4223 drbd_send_uuids(peer_device);
4224 drbd_send_current_state(peer_device);
4228 clear_bit(DISCARD_MY_DATA, &device->flags);
4230 drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4232 return 0;
4235 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4237 struct drbd_peer_device *peer_device;
4238 struct drbd_device *device;
4239 struct p_rs_uuid *p = pi->data;
4241 peer_device = conn_peer_device(connection, pi->vnr);
4242 if (!peer_device)
4243 return -EIO;
4244 device = peer_device->device;
4246 wait_event(device->misc_wait,
4247 device->state.conn == C_WF_SYNC_UUID ||
4248 device->state.conn == C_BEHIND ||
4249 device->state.conn < C_CONNECTED ||
4250 device->state.disk < D_NEGOTIATING);
4252 /* D_ASSERT(device, device->state.conn == C_WF_SYNC_UUID ); */
4254 /* Here the _drbd_uuid_ functions are right, current should
4255 _not_ be rotated into the history */
4256 if (get_ldev_if_state(device, D_NEGOTIATING)) {
4257 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4258 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4260 drbd_print_uuids(device, "updated sync uuid");
4261 drbd_start_resync(device, C_SYNC_TARGET);
4263 put_ldev(device);
4264 } else
4265 drbd_err(device, "Ignoring SyncUUID packet!\n");
4267 return 0;
4271 * receive_bitmap_plain
4273 * Return 0 when done, 1 when another iteration is needed, and a negative error
4274 * code upon failure.
4276 static int
4277 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4278 unsigned long *p, struct bm_xfer_ctx *c)
4280 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4281 drbd_header_size(peer_device->connection);
4282 unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4283 c->bm_words - c->word_offset);
4284 unsigned int want = num_words * sizeof(*p);
4285 int err;
4287 if (want != size) {
4288 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4289 return -EIO;
4291 if (want == 0)
4292 return 0;
4293 err = drbd_recv_all(peer_device->connection, p, want);
4294 if (err)
4295 return err;
4297 drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4299 c->word_offset += num_words;
4300 c->bit_offset = c->word_offset * BITS_PER_LONG;
4301 if (c->bit_offset > c->bm_bits)
4302 c->bit_offset = c->bm_bits;
4304 return 1;
4307 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4309 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4312 static int dcbp_get_start(struct p_compressed_bm *p)
4314 return (p->encoding & 0x80) != 0;
4317 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4319 return (p->encoding >> 4) & 0x7;
4323 * recv_bm_rle_bits
4325 * Return 0 when done, 1 when another iteration is needed, and a negative error
4326 * code upon failure.
4328 static int
4329 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4330 struct p_compressed_bm *p,
4331 struct bm_xfer_ctx *c,
4332 unsigned int len)
4334 struct bitstream bs;
4335 u64 look_ahead;
4336 u64 rl;
4337 u64 tmp;
4338 unsigned long s = c->bit_offset;
4339 unsigned long e;
4340 int toggle = dcbp_get_start(p);
4341 int have;
4342 int bits;
4344 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4346 bits = bitstream_get_bits(&bs, &look_ahead, 64);
4347 if (bits < 0)
4348 return -EIO;
4350 for (have = bits; have > 0; s += rl, toggle = !toggle) {
4351 bits = vli_decode_bits(&rl, look_ahead);
4352 if (bits <= 0)
4353 return -EIO;
4355 if (toggle) {
4356 e = s + rl -1;
4357 if (e >= c->bm_bits) {
4358 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4359 return -EIO;
4361 _drbd_bm_set_bits(peer_device->device, s, e);
4364 if (have < bits) {
4365 drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4366 have, bits, look_ahead,
4367 (unsigned int)(bs.cur.b - p->code),
4368 (unsigned int)bs.buf_len);
4369 return -EIO;
4371 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4372 if (likely(bits < 64))
4373 look_ahead >>= bits;
4374 else
4375 look_ahead = 0;
4376 have -= bits;
4378 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4379 if (bits < 0)
4380 return -EIO;
4381 look_ahead |= tmp << have;
4382 have += bits;
4385 c->bit_offset = s;
4386 bm_xfer_ctx_bit_to_word_offset(c);
4388 return (s != c->bm_bits);
4392 * decode_bitmap_c
4394 * Return 0 when done, 1 when another iteration is needed, and a negative error
4395 * code upon failure.
4397 static int
4398 decode_bitmap_c(struct drbd_peer_device *peer_device,
4399 struct p_compressed_bm *p,
4400 struct bm_xfer_ctx *c,
4401 unsigned int len)
4403 if (dcbp_get_code(p) == RLE_VLI_Bits)
4404 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4406 /* other variants had been implemented for evaluation,
4407 * but have been dropped as this one turned out to be "best"
4408 * during all our tests. */
4410 drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4411 conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4412 return -EIO;
4415 void INFO_bm_xfer_stats(struct drbd_device *device,
4416 const char *direction, struct bm_xfer_ctx *c)
4418 /* what would it take to transfer it "plaintext" */
4419 unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4420 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4421 unsigned int plain =
4422 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4423 c->bm_words * sizeof(unsigned long);
4424 unsigned int total = c->bytes[0] + c->bytes[1];
4425 unsigned int r;
4427 /* total can not be zero. but just in case: */
4428 if (total == 0)
4429 return;
4431 /* don't report if not compressed */
4432 if (total >= plain)
4433 return;
4435 /* total < plain. check for overflow, still */
4436 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4437 : (1000 * total / plain);
4439 if (r > 1000)
4440 r = 1000;
4442 r = 1000 - r;
4443 drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4444 "total %u; compression: %u.%u%%\n",
4445 direction,
4446 c->bytes[1], c->packets[1],
4447 c->bytes[0], c->packets[0],
4448 total, r/10, r % 10);
4451 /* Since we are processing the bitfield from lower addresses to higher,
4452 it does not matter if the process it in 32 bit chunks or 64 bit
4453 chunks as long as it is little endian. (Understand it as byte stream,
4454 beginning with the lowest byte...) If we would use big endian
4455 we would need to process it from the highest address to the lowest,
4456 in order to be agnostic to the 32 vs 64 bits issue.
4458 returns 0 on failure, 1 if we successfully received it. */
4459 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4461 struct drbd_peer_device *peer_device;
4462 struct drbd_device *device;
4463 struct bm_xfer_ctx c;
4464 int err;
4466 peer_device = conn_peer_device(connection, pi->vnr);
4467 if (!peer_device)
4468 return -EIO;
4469 device = peer_device->device;
4471 drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4472 /* you are supposed to send additional out-of-sync information
4473 * if you actually set bits during this phase */
4475 c = (struct bm_xfer_ctx) {
4476 .bm_bits = drbd_bm_bits(device),
4477 .bm_words = drbd_bm_words(device),
4480 for(;;) {
4481 if (pi->cmd == P_BITMAP)
4482 err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4483 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4484 /* MAYBE: sanity check that we speak proto >= 90,
4485 * and the feature is enabled! */
4486 struct p_compressed_bm *p = pi->data;
4488 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4489 drbd_err(device, "ReportCBitmap packet too large\n");
4490 err = -EIO;
4491 goto out;
4493 if (pi->size <= sizeof(*p)) {
4494 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4495 err = -EIO;
4496 goto out;
4498 err = drbd_recv_all(peer_device->connection, p, pi->size);
4499 if (err)
4500 goto out;
4501 err = decode_bitmap_c(peer_device, p, &c, pi->size);
4502 } else {
4503 drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4504 err = -EIO;
4505 goto out;
4508 c.packets[pi->cmd == P_BITMAP]++;
4509 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4511 if (err <= 0) {
4512 if (err < 0)
4513 goto out;
4514 break;
4516 err = drbd_recv_header(peer_device->connection, pi);
4517 if (err)
4518 goto out;
4521 INFO_bm_xfer_stats(device, "receive", &c);
4523 if (device->state.conn == C_WF_BITMAP_T) {
4524 enum drbd_state_rv rv;
4526 err = drbd_send_bitmap(device);
4527 if (err)
4528 goto out;
4529 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4530 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4531 D_ASSERT(device, rv == SS_SUCCESS);
4532 } else if (device->state.conn != C_WF_BITMAP_S) {
4533 /* admin may have requested C_DISCONNECTING,
4534 * other threads may have noticed network errors */
4535 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4536 drbd_conn_str(device->state.conn));
4538 err = 0;
4540 out:
4541 drbd_bm_unlock(device);
4542 if (!err && device->state.conn == C_WF_BITMAP_S)
4543 drbd_start_resync(device, C_SYNC_SOURCE);
4544 return err;
4547 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4549 drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4550 pi->cmd, pi->size);
4552 return ignore_remaining_packet(connection, pi);
4555 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4557 /* Make sure we've acked all the TCP data associated
4558 * with the data requests being unplugged */
4559 drbd_tcp_quickack(connection->data.socket);
4561 return 0;
4564 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4566 struct drbd_peer_device *peer_device;
4567 struct drbd_device *device;
4568 struct p_block_desc *p = pi->data;
4570 peer_device = conn_peer_device(connection, pi->vnr);
4571 if (!peer_device)
4572 return -EIO;
4573 device = peer_device->device;
4575 switch (device->state.conn) {
4576 case C_WF_SYNC_UUID:
4577 case C_WF_BITMAP_T:
4578 case C_BEHIND:
4579 break;
4580 default:
4581 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4582 drbd_conn_str(device->state.conn));
4585 drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4587 return 0;
4590 struct data_cmd {
4591 int expect_payload;
4592 size_t pkt_size;
4593 int (*fn)(struct drbd_connection *, struct packet_info *);
4596 static struct data_cmd drbd_cmd_handler[] = {
4597 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
4598 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
4599 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4600 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4601 [P_BITMAP] = { 1, 0, receive_bitmap } ,
4602 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4603 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote },
4604 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4605 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4606 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam },
4607 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam },
4608 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
4609 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
4610 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
4611 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
4612 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
4613 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4614 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4615 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4616 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4617 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
4618 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4619 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4620 [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4621 [P_TRIM] = { 0, sizeof(struct p_trim), receive_Data },
4624 static void drbdd(struct drbd_connection *connection)
4626 struct packet_info pi;
4627 size_t shs; /* sub header size */
4628 int err;
4630 while (get_t_state(&connection->receiver) == RUNNING) {
4631 struct data_cmd *cmd;
4633 drbd_thread_current_set_cpu(&connection->receiver);
4634 update_receiver_timing_details(connection, drbd_recv_header);
4635 if (drbd_recv_header(connection, &pi))
4636 goto err_out;
4638 cmd = &drbd_cmd_handler[pi.cmd];
4639 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4640 drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4641 cmdname(pi.cmd), pi.cmd);
4642 goto err_out;
4645 shs = cmd->pkt_size;
4646 if (pi.size > shs && !cmd->expect_payload) {
4647 drbd_err(connection, "No payload expected %s l:%d\n",
4648 cmdname(pi.cmd), pi.size);
4649 goto err_out;
4652 if (shs) {
4653 update_receiver_timing_details(connection, drbd_recv_all_warn);
4654 err = drbd_recv_all_warn(connection, pi.data, shs);
4655 if (err)
4656 goto err_out;
4657 pi.size -= shs;
4660 update_receiver_timing_details(connection, cmd->fn);
4661 err = cmd->fn(connection, &pi);
4662 if (err) {
4663 drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4664 cmdname(pi.cmd), err, pi.size);
4665 goto err_out;
4668 return;
4670 err_out:
4671 conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4674 static void conn_disconnect(struct drbd_connection *connection)
4676 struct drbd_peer_device *peer_device;
4677 enum drbd_conns oc;
4678 int vnr;
4680 if (connection->cstate == C_STANDALONE)
4681 return;
4683 /* We are about to start the cleanup after connection loss.
4684 * Make sure drbd_make_request knows about that.
4685 * Usually we should be in some network failure state already,
4686 * but just in case we are not, we fix it up here.
4688 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4690 /* ack_receiver does not clean up anything. it must not interfere, either */
4691 drbd_thread_stop(&connection->ack_receiver);
4692 if (connection->ack_sender) {
4693 destroy_workqueue(connection->ack_sender);
4694 connection->ack_sender = NULL;
4696 drbd_free_sock(connection);
4698 rcu_read_lock();
4699 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4700 struct drbd_device *device = peer_device->device;
4701 kref_get(&device->kref);
4702 rcu_read_unlock();
4703 drbd_disconnected(peer_device);
4704 kref_put(&device->kref, drbd_destroy_device);
4705 rcu_read_lock();
4707 rcu_read_unlock();
4709 if (!list_empty(&connection->current_epoch->list))
4710 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4711 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4712 atomic_set(&connection->current_epoch->epoch_size, 0);
4713 connection->send.seen_any_write_yet = false;
4715 drbd_info(connection, "Connection closed\n");
4717 if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4718 conn_try_outdate_peer_async(connection);
4720 spin_lock_irq(&connection->resource->req_lock);
4721 oc = connection->cstate;
4722 if (oc >= C_UNCONNECTED)
4723 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4725 spin_unlock_irq(&connection->resource->req_lock);
4727 if (oc == C_DISCONNECTING)
4728 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4731 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4733 struct drbd_device *device = peer_device->device;
4734 unsigned int i;
4736 /* wait for current activity to cease. */
4737 spin_lock_irq(&device->resource->req_lock);
4738 _drbd_wait_ee_list_empty(device, &device->active_ee);
4739 _drbd_wait_ee_list_empty(device, &device->sync_ee);
4740 _drbd_wait_ee_list_empty(device, &device->read_ee);
4741 spin_unlock_irq(&device->resource->req_lock);
4743 /* We do not have data structures that would allow us to
4744 * get the rs_pending_cnt down to 0 again.
4745 * * On C_SYNC_TARGET we do not have any data structures describing
4746 * the pending RSDataRequest's we have sent.
4747 * * On C_SYNC_SOURCE there is no data structure that tracks
4748 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4749 * And no, it is not the sum of the reference counts in the
4750 * resync_LRU. The resync_LRU tracks the whole operation including
4751 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4752 * on the fly. */
4753 drbd_rs_cancel_all(device);
4754 device->rs_total = 0;
4755 device->rs_failed = 0;
4756 atomic_set(&device->rs_pending_cnt, 0);
4757 wake_up(&device->misc_wait);
4759 del_timer_sync(&device->resync_timer);
4760 resync_timer_fn((unsigned long)device);
4762 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4763 * w_make_resync_request etc. which may still be on the worker queue
4764 * to be "canceled" */
4765 drbd_flush_workqueue(&peer_device->connection->sender_work);
4767 drbd_finish_peer_reqs(device);
4769 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4770 might have issued a work again. The one before drbd_finish_peer_reqs() is
4771 necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4772 drbd_flush_workqueue(&peer_device->connection->sender_work);
4774 /* need to do it again, drbd_finish_peer_reqs() may have populated it
4775 * again via drbd_try_clear_on_disk_bm(). */
4776 drbd_rs_cancel_all(device);
4778 kfree(device->p_uuid);
4779 device->p_uuid = NULL;
4781 if (!drbd_suspended(device))
4782 tl_clear(peer_device->connection);
4784 drbd_md_sync(device);
4786 /* serialize with bitmap writeout triggered by the state change,
4787 * if any. */
4788 wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4790 /* tcp_close and release of sendpage pages can be deferred. I don't
4791 * want to use SO_LINGER, because apparently it can be deferred for
4792 * more than 20 seconds (longest time I checked).
4794 * Actually we don't care for exactly when the network stack does its
4795 * put_page(), but release our reference on these pages right here.
4797 i = drbd_free_peer_reqs(device, &device->net_ee);
4798 if (i)
4799 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4800 i = atomic_read(&device->pp_in_use_by_net);
4801 if (i)
4802 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4803 i = atomic_read(&device->pp_in_use);
4804 if (i)
4805 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4807 D_ASSERT(device, list_empty(&device->read_ee));
4808 D_ASSERT(device, list_empty(&device->active_ee));
4809 D_ASSERT(device, list_empty(&device->sync_ee));
4810 D_ASSERT(device, list_empty(&device->done_ee));
4812 return 0;
4816 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4817 * we can agree on is stored in agreed_pro_version.
4819 * feature flags and the reserved array should be enough room for future
4820 * enhancements of the handshake protocol, and possible plugins...
4822 * for now, they are expected to be zero, but ignored.
4824 static int drbd_send_features(struct drbd_connection *connection)
4826 struct drbd_socket *sock;
4827 struct p_connection_features *p;
4829 sock = &connection->data;
4830 p = conn_prepare_command(connection, sock);
4831 if (!p)
4832 return -EIO;
4833 memset(p, 0, sizeof(*p));
4834 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4835 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4836 p->feature_flags = cpu_to_be32(PRO_FEATURES);
4837 return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4841 * return values:
4842 * 1 yes, we have a valid connection
4843 * 0 oops, did not work out, please try again
4844 * -1 peer talks different language,
4845 * no point in trying again, please go standalone.
4847 static int drbd_do_features(struct drbd_connection *connection)
4849 /* ASSERT current == connection->receiver ... */
4850 struct p_connection_features *p;
4851 const int expect = sizeof(struct p_connection_features);
4852 struct packet_info pi;
4853 int err;
4855 err = drbd_send_features(connection);
4856 if (err)
4857 return 0;
4859 err = drbd_recv_header(connection, &pi);
4860 if (err)
4861 return 0;
4863 if (pi.cmd != P_CONNECTION_FEATURES) {
4864 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4865 cmdname(pi.cmd), pi.cmd);
4866 return -1;
4869 if (pi.size != expect) {
4870 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4871 expect, pi.size);
4872 return -1;
4875 p = pi.data;
4876 err = drbd_recv_all_warn(connection, p, expect);
4877 if (err)
4878 return 0;
4880 p->protocol_min = be32_to_cpu(p->protocol_min);
4881 p->protocol_max = be32_to_cpu(p->protocol_max);
4882 if (p->protocol_max == 0)
4883 p->protocol_max = p->protocol_min;
4885 if (PRO_VERSION_MAX < p->protocol_min ||
4886 PRO_VERSION_MIN > p->protocol_max)
4887 goto incompat;
4889 connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4890 connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
4892 drbd_info(connection, "Handshake successful: "
4893 "Agreed network protocol version %d\n", connection->agreed_pro_version);
4895 drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n",
4896 connection->agreed_features & FF_TRIM ? " " : " not ");
4898 return 1;
4900 incompat:
4901 drbd_err(connection, "incompatible DRBD dialects: "
4902 "I support %d-%d, peer supports %d-%d\n",
4903 PRO_VERSION_MIN, PRO_VERSION_MAX,
4904 p->protocol_min, p->protocol_max);
4905 return -1;
4908 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4909 static int drbd_do_auth(struct drbd_connection *connection)
4911 drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4912 drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4913 return -1;
4915 #else
4916 #define CHALLENGE_LEN 64
4918 /* Return value:
4919 1 - auth succeeded,
4920 0 - failed, try again (network error),
4921 -1 - auth failed, don't try again.
4924 static int drbd_do_auth(struct drbd_connection *connection)
4926 struct drbd_socket *sock;
4927 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4928 char *response = NULL;
4929 char *right_response = NULL;
4930 char *peers_ch = NULL;
4931 unsigned int key_len;
4932 char secret[SHARED_SECRET_MAX]; /* 64 byte */
4933 unsigned int resp_size;
4934 SHASH_DESC_ON_STACK(desc, connection->cram_hmac_tfm);
4935 struct packet_info pi;
4936 struct net_conf *nc;
4937 int err, rv;
4939 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
4941 rcu_read_lock();
4942 nc = rcu_dereference(connection->net_conf);
4943 key_len = strlen(nc->shared_secret);
4944 memcpy(secret, nc->shared_secret, key_len);
4945 rcu_read_unlock();
4947 desc->tfm = connection->cram_hmac_tfm;
4948 desc->flags = 0;
4950 rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4951 if (rv) {
4952 drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
4953 rv = -1;
4954 goto fail;
4957 get_random_bytes(my_challenge, CHALLENGE_LEN);
4959 sock = &connection->data;
4960 if (!conn_prepare_command(connection, sock)) {
4961 rv = 0;
4962 goto fail;
4964 rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4965 my_challenge, CHALLENGE_LEN);
4966 if (!rv)
4967 goto fail;
4969 err = drbd_recv_header(connection, &pi);
4970 if (err) {
4971 rv = 0;
4972 goto fail;
4975 if (pi.cmd != P_AUTH_CHALLENGE) {
4976 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4977 cmdname(pi.cmd), pi.cmd);
4978 rv = 0;
4979 goto fail;
4982 if (pi.size > CHALLENGE_LEN * 2) {
4983 drbd_err(connection, "expected AuthChallenge payload too big.\n");
4984 rv = -1;
4985 goto fail;
4988 if (pi.size < CHALLENGE_LEN) {
4989 drbd_err(connection, "AuthChallenge payload too small.\n");
4990 rv = -1;
4991 goto fail;
4994 peers_ch = kmalloc(pi.size, GFP_NOIO);
4995 if (peers_ch == NULL) {
4996 drbd_err(connection, "kmalloc of peers_ch failed\n");
4997 rv = -1;
4998 goto fail;
5001 err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5002 if (err) {
5003 rv = 0;
5004 goto fail;
5007 if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5008 drbd_err(connection, "Peer presented the same challenge!\n");
5009 rv = -1;
5010 goto fail;
5013 resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5014 response = kmalloc(resp_size, GFP_NOIO);
5015 if (response == NULL) {
5016 drbd_err(connection, "kmalloc of response failed\n");
5017 rv = -1;
5018 goto fail;
5021 rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5022 if (rv) {
5023 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5024 rv = -1;
5025 goto fail;
5028 if (!conn_prepare_command(connection, sock)) {
5029 rv = 0;
5030 goto fail;
5032 rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5033 response, resp_size);
5034 if (!rv)
5035 goto fail;
5037 err = drbd_recv_header(connection, &pi);
5038 if (err) {
5039 rv = 0;
5040 goto fail;
5043 if (pi.cmd != P_AUTH_RESPONSE) {
5044 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5045 cmdname(pi.cmd), pi.cmd);
5046 rv = 0;
5047 goto fail;
5050 if (pi.size != resp_size) {
5051 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5052 rv = 0;
5053 goto fail;
5056 err = drbd_recv_all_warn(connection, response , resp_size);
5057 if (err) {
5058 rv = 0;
5059 goto fail;
5062 right_response = kmalloc(resp_size, GFP_NOIO);
5063 if (right_response == NULL) {
5064 drbd_err(connection, "kmalloc of right_response failed\n");
5065 rv = -1;
5066 goto fail;
5069 rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5070 right_response);
5071 if (rv) {
5072 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5073 rv = -1;
5074 goto fail;
5077 rv = !memcmp(response, right_response, resp_size);
5079 if (rv)
5080 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5081 resp_size);
5082 else
5083 rv = -1;
5085 fail:
5086 kfree(peers_ch);
5087 kfree(response);
5088 kfree(right_response);
5089 shash_desc_zero(desc);
5091 return rv;
5093 #endif
5095 int drbd_receiver(struct drbd_thread *thi)
5097 struct drbd_connection *connection = thi->connection;
5098 int h;
5100 drbd_info(connection, "receiver (re)started\n");
5102 do {
5103 h = conn_connect(connection);
5104 if (h == 0) {
5105 conn_disconnect(connection);
5106 schedule_timeout_interruptible(HZ);
5108 if (h == -1) {
5109 drbd_warn(connection, "Discarding network configuration.\n");
5110 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5112 } while (h == 0);
5114 if (h > 0)
5115 drbdd(connection);
5117 conn_disconnect(connection);
5119 drbd_info(connection, "receiver terminated\n");
5120 return 0;
5123 /* ********* acknowledge sender ******** */
5125 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5127 struct p_req_state_reply *p = pi->data;
5128 int retcode = be32_to_cpu(p->retcode);
5130 if (retcode >= SS_SUCCESS) {
5131 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5132 } else {
5133 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5134 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5135 drbd_set_st_err_str(retcode), retcode);
5137 wake_up(&connection->ping_wait);
5139 return 0;
5142 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5144 struct drbd_peer_device *peer_device;
5145 struct drbd_device *device;
5146 struct p_req_state_reply *p = pi->data;
5147 int retcode = be32_to_cpu(p->retcode);
5149 peer_device = conn_peer_device(connection, pi->vnr);
5150 if (!peer_device)
5151 return -EIO;
5152 device = peer_device->device;
5154 if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5155 D_ASSERT(device, connection->agreed_pro_version < 100);
5156 return got_conn_RqSReply(connection, pi);
5159 if (retcode >= SS_SUCCESS) {
5160 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5161 } else {
5162 set_bit(CL_ST_CHG_FAIL, &device->flags);
5163 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5164 drbd_set_st_err_str(retcode), retcode);
5166 wake_up(&device->state_wait);
5168 return 0;
5171 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5173 return drbd_send_ping_ack(connection);
5177 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5179 /* restore idle timeout */
5180 connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5181 if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5182 wake_up(&connection->ping_wait);
5184 return 0;
5187 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5189 struct drbd_peer_device *peer_device;
5190 struct drbd_device *device;
5191 struct p_block_ack *p = pi->data;
5192 sector_t sector = be64_to_cpu(p->sector);
5193 int blksize = be32_to_cpu(p->blksize);
5195 peer_device = conn_peer_device(connection, pi->vnr);
5196 if (!peer_device)
5197 return -EIO;
5198 device = peer_device->device;
5200 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5202 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5204 if (get_ldev(device)) {
5205 drbd_rs_complete_io(device, sector);
5206 drbd_set_in_sync(device, sector, blksize);
5207 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5208 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5209 put_ldev(device);
5211 dec_rs_pending(device);
5212 atomic_add(blksize >> 9, &device->rs_sect_in);
5214 return 0;
5217 static int
5218 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5219 struct rb_root *root, const char *func,
5220 enum drbd_req_event what, bool missing_ok)
5222 struct drbd_request *req;
5223 struct bio_and_error m;
5225 spin_lock_irq(&device->resource->req_lock);
5226 req = find_request(device, root, id, sector, missing_ok, func);
5227 if (unlikely(!req)) {
5228 spin_unlock_irq(&device->resource->req_lock);
5229 return -EIO;
5231 __req_mod(req, what, &m);
5232 spin_unlock_irq(&device->resource->req_lock);
5234 if (m.bio)
5235 complete_master_bio(device, &m);
5236 return 0;
5239 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5241 struct drbd_peer_device *peer_device;
5242 struct drbd_device *device;
5243 struct p_block_ack *p = pi->data;
5244 sector_t sector = be64_to_cpu(p->sector);
5245 int blksize = be32_to_cpu(p->blksize);
5246 enum drbd_req_event what;
5248 peer_device = conn_peer_device(connection, pi->vnr);
5249 if (!peer_device)
5250 return -EIO;
5251 device = peer_device->device;
5253 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5255 if (p->block_id == ID_SYNCER) {
5256 drbd_set_in_sync(device, sector, blksize);
5257 dec_rs_pending(device);
5258 return 0;
5260 switch (pi->cmd) {
5261 case P_RS_WRITE_ACK:
5262 what = WRITE_ACKED_BY_PEER_AND_SIS;
5263 break;
5264 case P_WRITE_ACK:
5265 what = WRITE_ACKED_BY_PEER;
5266 break;
5267 case P_RECV_ACK:
5268 what = RECV_ACKED_BY_PEER;
5269 break;
5270 case P_SUPERSEDED:
5271 what = CONFLICT_RESOLVED;
5272 break;
5273 case P_RETRY_WRITE:
5274 what = POSTPONE_WRITE;
5275 break;
5276 default:
5277 BUG();
5280 return validate_req_change_req_state(device, p->block_id, sector,
5281 &device->write_requests, __func__,
5282 what, false);
5285 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5287 struct drbd_peer_device *peer_device;
5288 struct drbd_device *device;
5289 struct p_block_ack *p = pi->data;
5290 sector_t sector = be64_to_cpu(p->sector);
5291 int size = be32_to_cpu(p->blksize);
5292 int err;
5294 peer_device = conn_peer_device(connection, pi->vnr);
5295 if (!peer_device)
5296 return -EIO;
5297 device = peer_device->device;
5299 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5301 if (p->block_id == ID_SYNCER) {
5302 dec_rs_pending(device);
5303 drbd_rs_failed_io(device, sector, size);
5304 return 0;
5307 err = validate_req_change_req_state(device, p->block_id, sector,
5308 &device->write_requests, __func__,
5309 NEG_ACKED, true);
5310 if (err) {
5311 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5312 The master bio might already be completed, therefore the
5313 request is no longer in the collision hash. */
5314 /* In Protocol B we might already have got a P_RECV_ACK
5315 but then get a P_NEG_ACK afterwards. */
5316 drbd_set_out_of_sync(device, sector, size);
5318 return 0;
5321 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5323 struct drbd_peer_device *peer_device;
5324 struct drbd_device *device;
5325 struct p_block_ack *p = pi->data;
5326 sector_t sector = be64_to_cpu(p->sector);
5328 peer_device = conn_peer_device(connection, pi->vnr);
5329 if (!peer_device)
5330 return -EIO;
5331 device = peer_device->device;
5333 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5335 drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5336 (unsigned long long)sector, be32_to_cpu(p->blksize));
5338 return validate_req_change_req_state(device, p->block_id, sector,
5339 &device->read_requests, __func__,
5340 NEG_ACKED, false);
5343 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5345 struct drbd_peer_device *peer_device;
5346 struct drbd_device *device;
5347 sector_t sector;
5348 int size;
5349 struct p_block_ack *p = pi->data;
5351 peer_device = conn_peer_device(connection, pi->vnr);
5352 if (!peer_device)
5353 return -EIO;
5354 device = peer_device->device;
5356 sector = be64_to_cpu(p->sector);
5357 size = be32_to_cpu(p->blksize);
5359 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5361 dec_rs_pending(device);
5363 if (get_ldev_if_state(device, D_FAILED)) {
5364 drbd_rs_complete_io(device, sector);
5365 switch (pi->cmd) {
5366 case P_NEG_RS_DREPLY:
5367 drbd_rs_failed_io(device, sector, size);
5368 case P_RS_CANCEL:
5369 break;
5370 default:
5371 BUG();
5373 put_ldev(device);
5376 return 0;
5379 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5381 struct p_barrier_ack *p = pi->data;
5382 struct drbd_peer_device *peer_device;
5383 int vnr;
5385 tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5387 rcu_read_lock();
5388 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5389 struct drbd_device *device = peer_device->device;
5391 if (device->state.conn == C_AHEAD &&
5392 atomic_read(&device->ap_in_flight) == 0 &&
5393 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5394 device->start_resync_timer.expires = jiffies + HZ;
5395 add_timer(&device->start_resync_timer);
5398 rcu_read_unlock();
5400 return 0;
5403 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5405 struct drbd_peer_device *peer_device;
5406 struct drbd_device *device;
5407 struct p_block_ack *p = pi->data;
5408 struct drbd_device_work *dw;
5409 sector_t sector;
5410 int size;
5412 peer_device = conn_peer_device(connection, pi->vnr);
5413 if (!peer_device)
5414 return -EIO;
5415 device = peer_device->device;
5417 sector = be64_to_cpu(p->sector);
5418 size = be32_to_cpu(p->blksize);
5420 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5422 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5423 drbd_ov_out_of_sync_found(device, sector, size);
5424 else
5425 ov_out_of_sync_print(device);
5427 if (!get_ldev(device))
5428 return 0;
5430 drbd_rs_complete_io(device, sector);
5431 dec_rs_pending(device);
5433 --device->ov_left;
5435 /* let's advance progress step marks only for every other megabyte */
5436 if ((device->ov_left & 0x200) == 0x200)
5437 drbd_advance_rs_marks(device, device->ov_left);
5439 if (device->ov_left == 0) {
5440 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5441 if (dw) {
5442 dw->w.cb = w_ov_finished;
5443 dw->device = device;
5444 drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5445 } else {
5446 drbd_err(device, "kmalloc(dw) failed.");
5447 ov_out_of_sync_print(device);
5448 drbd_resync_finished(device);
5451 put_ldev(device);
5452 return 0;
5455 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5457 return 0;
5460 struct meta_sock_cmd {
5461 size_t pkt_size;
5462 int (*fn)(struct drbd_connection *connection, struct packet_info *);
5465 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5467 long t;
5468 struct net_conf *nc;
5470 rcu_read_lock();
5471 nc = rcu_dereference(connection->net_conf);
5472 t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5473 rcu_read_unlock();
5475 t *= HZ;
5476 if (ping_timeout)
5477 t /= 10;
5479 connection->meta.socket->sk->sk_rcvtimeo = t;
5482 static void set_ping_timeout(struct drbd_connection *connection)
5484 set_rcvtimeo(connection, 1);
5487 static void set_idle_timeout(struct drbd_connection *connection)
5489 set_rcvtimeo(connection, 0);
5492 static struct meta_sock_cmd ack_receiver_tbl[] = {
5493 [P_PING] = { 0, got_Ping },
5494 [P_PING_ACK] = { 0, got_PingAck },
5495 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5496 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5497 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5498 [P_SUPERSEDED] = { sizeof(struct p_block_ack), got_BlockAck },
5499 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
5500 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
5501 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
5502 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
5503 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
5504 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5505 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
5506 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
5507 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
5508 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5509 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
5512 int drbd_ack_receiver(struct drbd_thread *thi)
5514 struct drbd_connection *connection = thi->connection;
5515 struct meta_sock_cmd *cmd = NULL;
5516 struct packet_info pi;
5517 unsigned long pre_recv_jif;
5518 int rv;
5519 void *buf = connection->meta.rbuf;
5520 int received = 0;
5521 unsigned int header_size = drbd_header_size(connection);
5522 int expect = header_size;
5523 bool ping_timeout_active = false;
5524 struct sched_param param = { .sched_priority = 2 };
5526 rv = sched_setscheduler(current, SCHED_RR, &param);
5527 if (rv < 0)
5528 drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
5530 while (get_t_state(thi) == RUNNING) {
5531 drbd_thread_current_set_cpu(thi);
5533 conn_reclaim_net_peer_reqs(connection);
5535 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5536 if (drbd_send_ping(connection)) {
5537 drbd_err(connection, "drbd_send_ping has failed\n");
5538 goto reconnect;
5540 set_ping_timeout(connection);
5541 ping_timeout_active = true;
5544 pre_recv_jif = jiffies;
5545 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5547 /* Note:
5548 * -EINTR (on meta) we got a signal
5549 * -EAGAIN (on meta) rcvtimeo expired
5550 * -ECONNRESET other side closed the connection
5551 * -ERESTARTSYS (on data) we got a signal
5552 * rv < 0 other than above: unexpected error!
5553 * rv == expected: full header or command
5554 * rv < expected: "woken" by signal during receive
5555 * rv == 0 : "connection shut down by peer"
5557 if (likely(rv > 0)) {
5558 received += rv;
5559 buf += rv;
5560 } else if (rv == 0) {
5561 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5562 long t;
5563 rcu_read_lock();
5564 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5565 rcu_read_unlock();
5567 t = wait_event_timeout(connection->ping_wait,
5568 connection->cstate < C_WF_REPORT_PARAMS,
5570 if (t)
5571 break;
5573 drbd_err(connection, "meta connection shut down by peer.\n");
5574 goto reconnect;
5575 } else if (rv == -EAGAIN) {
5576 /* If the data socket received something meanwhile,
5577 * that is good enough: peer is still alive. */
5578 if (time_after(connection->last_received, pre_recv_jif))
5579 continue;
5580 if (ping_timeout_active) {
5581 drbd_err(connection, "PingAck did not arrive in time.\n");
5582 goto reconnect;
5584 set_bit(SEND_PING, &connection->flags);
5585 continue;
5586 } else if (rv == -EINTR) {
5587 /* maybe drbd_thread_stop(): the while condition will notice.
5588 * maybe woken for send_ping: we'll send a ping above,
5589 * and change the rcvtimeo */
5590 flush_signals(current);
5591 continue;
5592 } else {
5593 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5594 goto reconnect;
5597 if (received == expect && cmd == NULL) {
5598 if (decode_header(connection, connection->meta.rbuf, &pi))
5599 goto reconnect;
5600 cmd = &ack_receiver_tbl[pi.cmd];
5601 if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5602 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5603 cmdname(pi.cmd), pi.cmd);
5604 goto disconnect;
5606 expect = header_size + cmd->pkt_size;
5607 if (pi.size != expect - header_size) {
5608 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5609 pi.cmd, pi.size);
5610 goto reconnect;
5613 if (received == expect) {
5614 bool err;
5616 err = cmd->fn(connection, &pi);
5617 if (err) {
5618 drbd_err(connection, "%pf failed\n", cmd->fn);
5619 goto reconnect;
5622 connection->last_received = jiffies;
5624 if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5625 set_idle_timeout(connection);
5626 ping_timeout_active = false;
5629 buf = connection->meta.rbuf;
5630 received = 0;
5631 expect = header_size;
5632 cmd = NULL;
5636 if (0) {
5637 reconnect:
5638 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5639 conn_md_sync(connection);
5641 if (0) {
5642 disconnect:
5643 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5646 drbd_info(connection, "ack_receiver terminated\n");
5648 return 0;
5651 void drbd_send_acks_wf(struct work_struct *ws)
5653 struct drbd_peer_device *peer_device =
5654 container_of(ws, struct drbd_peer_device, send_acks_work);
5655 struct drbd_connection *connection = peer_device->connection;
5656 struct drbd_device *device = peer_device->device;
5657 struct net_conf *nc;
5658 int tcp_cork, err;
5660 rcu_read_lock();
5661 nc = rcu_dereference(connection->net_conf);
5662 tcp_cork = nc->tcp_cork;
5663 rcu_read_unlock();
5665 if (tcp_cork)
5666 drbd_tcp_cork(connection->meta.socket);
5668 err = drbd_finish_peer_reqs(device);
5669 kref_put(&device->kref, drbd_destroy_device);
5670 /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
5671 struct work_struct send_acks_work alive, which is in the peer_device object */
5673 if (err) {
5674 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5675 return;
5678 if (tcp_cork)
5679 drbd_tcp_uncork(connection->meta.socket);
5681 return;