4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
28 #include <linux/uaccess.h>
31 #include <linux/drbd.h>
33 #include <linux/file.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <uapi/linux/sched/types.h>
40 #include <linux/sched/signal.h>
41 #include <linux/pkt_sched.h>
42 #define __KERNEL_SYSCALLS__
43 #include <linux/unistd.h>
44 #include <linux/vmalloc.h>
45 #include <linux/random.h>
46 #include <linux/string.h>
47 #include <linux/scatterlist.h>
49 #include "drbd_protocol.h"
53 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME|DRBD_FF_WZEROES)
68 static int drbd_do_features(struct drbd_connection
*connection
);
69 static int drbd_do_auth(struct drbd_connection
*connection
);
70 static int drbd_disconnected(struct drbd_peer_device
*);
71 static void conn_wait_active_ee_empty(struct drbd_connection
*connection
);
72 static enum finish_epoch
drbd_may_finish_epoch(struct drbd_connection
*, struct drbd_epoch
*, enum epoch_event
);
73 static int e_end_block(struct drbd_work
*, int);
76 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
79 * some helper functions to deal with single linked page lists,
80 * page->private being our "next" pointer.
83 /* If at least n pages are linked at head, get n pages off.
84 * Otherwise, don't modify head, and return NULL.
85 * Locking is the responsibility of the caller.
87 static struct page
*page_chain_del(struct page
**head
, int n
)
101 tmp
= page_chain_next(page
);
103 break; /* found sufficient pages */
105 /* insufficient pages, don't use any of them. */
110 /* add end of list marker for the returned list */
111 set_page_private(page
, 0);
112 /* actual return value, and adjustment of head */
118 /* may be used outside of locks to find the tail of a (usually short)
119 * "private" page chain, before adding it back to a global chain head
120 * with page_chain_add() under a spinlock. */
121 static struct page
*page_chain_tail(struct page
*page
, int *len
)
125 while ((tmp
= page_chain_next(page
)))
132 static int page_chain_free(struct page
*page
)
136 page_chain_for_each_safe(page
, tmp
) {
143 static void page_chain_add(struct page
**head
,
144 struct page
*chain_first
, struct page
*chain_last
)
148 tmp
= page_chain_tail(chain_first
, NULL
);
149 BUG_ON(tmp
!= chain_last
);
152 /* add chain to head */
153 set_page_private(chain_last
, (unsigned long)*head
);
157 static struct page
*__drbd_alloc_pages(struct drbd_device
*device
,
160 struct page
*page
= NULL
;
161 struct page
*tmp
= NULL
;
164 /* Yes, testing drbd_pp_vacant outside the lock is racy.
165 * So what. It saves a spin_lock. */
166 if (drbd_pp_vacant
>= number
) {
167 spin_lock(&drbd_pp_lock
);
168 page
= page_chain_del(&drbd_pp_pool
, number
);
170 drbd_pp_vacant
-= number
;
171 spin_unlock(&drbd_pp_lock
);
176 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
177 * "criss-cross" setup, that might cause write-out on some other DRBD,
178 * which in turn might block on the other node at this very place. */
179 for (i
= 0; i
< number
; i
++) {
180 tmp
= alloc_page(GFP_TRY
);
183 set_page_private(tmp
, (unsigned long)page
);
190 /* Not enough pages immediately available this time.
191 * No need to jump around here, drbd_alloc_pages will retry this
192 * function "soon". */
194 tmp
= page_chain_tail(page
, NULL
);
195 spin_lock(&drbd_pp_lock
);
196 page_chain_add(&drbd_pp_pool
, page
, tmp
);
198 spin_unlock(&drbd_pp_lock
);
203 static void reclaim_finished_net_peer_reqs(struct drbd_device
*device
,
204 struct list_head
*to_be_freed
)
206 struct drbd_peer_request
*peer_req
, *tmp
;
208 /* The EEs are always appended to the end of the list. Since
209 they are sent in order over the wire, they have to finish
210 in order. As soon as we see the first not finished we can
211 stop to examine the list... */
213 list_for_each_entry_safe(peer_req
, tmp
, &device
->net_ee
, w
.list
) {
214 if (drbd_peer_req_has_active_page(peer_req
))
216 list_move(&peer_req
->w
.list
, to_be_freed
);
220 static void drbd_reclaim_net_peer_reqs(struct drbd_device
*device
)
222 LIST_HEAD(reclaimed
);
223 struct drbd_peer_request
*peer_req
, *t
;
225 spin_lock_irq(&device
->resource
->req_lock
);
226 reclaim_finished_net_peer_reqs(device
, &reclaimed
);
227 spin_unlock_irq(&device
->resource
->req_lock
);
228 list_for_each_entry_safe(peer_req
, t
, &reclaimed
, w
.list
)
229 drbd_free_net_peer_req(device
, peer_req
);
232 static void conn_reclaim_net_peer_reqs(struct drbd_connection
*connection
)
234 struct drbd_peer_device
*peer_device
;
238 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
) {
239 struct drbd_device
*device
= peer_device
->device
;
240 if (!atomic_read(&device
->pp_in_use_by_net
))
243 kref_get(&device
->kref
);
245 drbd_reclaim_net_peer_reqs(device
);
246 kref_put(&device
->kref
, drbd_destroy_device
);
253 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
254 * @device: DRBD device.
255 * @number: number of pages requested
256 * @retry: whether to retry, if not enough pages are available right now
258 * Tries to allocate number pages, first from our own page pool, then from
260 * Possibly retry until DRBD frees sufficient pages somewhere else.
262 * If this allocation would exceed the max_buffers setting, we throttle
263 * allocation (schedule_timeout) to give the system some room to breathe.
265 * We do not use max-buffers as hard limit, because it could lead to
266 * congestion and further to a distributed deadlock during online-verify or
267 * (checksum based) resync, if the max-buffers, socket buffer sizes and
268 * resync-rate settings are mis-configured.
270 * Returns a page chain linked via page->private.
272 struct page
*drbd_alloc_pages(struct drbd_peer_device
*peer_device
, unsigned int number
,
275 struct drbd_device
*device
= peer_device
->device
;
276 struct page
*page
= NULL
;
282 nc
= rcu_dereference(peer_device
->connection
->net_conf
);
283 mxb
= nc
? nc
->max_buffers
: 1000000;
286 if (atomic_read(&device
->pp_in_use
) < mxb
)
287 page
= __drbd_alloc_pages(device
, number
);
289 /* Try to keep the fast path fast, but occasionally we need
290 * to reclaim the pages we lended to the network stack. */
291 if (page
&& atomic_read(&device
->pp_in_use_by_net
) > 512)
292 drbd_reclaim_net_peer_reqs(device
);
294 while (page
== NULL
) {
295 prepare_to_wait(&drbd_pp_wait
, &wait
, TASK_INTERRUPTIBLE
);
297 drbd_reclaim_net_peer_reqs(device
);
299 if (atomic_read(&device
->pp_in_use
) < mxb
) {
300 page
= __drbd_alloc_pages(device
, number
);
308 if (signal_pending(current
)) {
309 drbd_warn(device
, "drbd_alloc_pages interrupted!\n");
313 if (schedule_timeout(HZ
/10) == 0)
316 finish_wait(&drbd_pp_wait
, &wait
);
319 atomic_add(number
, &device
->pp_in_use
);
323 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
324 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
325 * Either links the page chain back to the global pool,
326 * or returns all pages to the system. */
327 static void drbd_free_pages(struct drbd_device
*device
, struct page
*page
, int is_net
)
329 atomic_t
*a
= is_net
? &device
->pp_in_use_by_net
: &device
->pp_in_use
;
335 if (drbd_pp_vacant
> (DRBD_MAX_BIO_SIZE
/PAGE_SIZE
) * drbd_minor_count
)
336 i
= page_chain_free(page
);
339 tmp
= page_chain_tail(page
, &i
);
340 spin_lock(&drbd_pp_lock
);
341 page_chain_add(&drbd_pp_pool
, page
, tmp
);
343 spin_unlock(&drbd_pp_lock
);
345 i
= atomic_sub_return(i
, a
);
347 drbd_warn(device
, "ASSERTION FAILED: %s: %d < 0\n",
348 is_net
? "pp_in_use_by_net" : "pp_in_use", i
);
349 wake_up(&drbd_pp_wait
);
353 You need to hold the req_lock:
354 _drbd_wait_ee_list_empty()
356 You must not have the req_lock:
358 drbd_alloc_peer_req()
359 drbd_free_peer_reqs()
361 drbd_finish_peer_reqs()
363 drbd_wait_ee_list_empty()
366 /* normal: payload_size == request size (bi_size)
367 * w_same: payload_size == logical_block_size
368 * trim: payload_size == 0 */
369 struct drbd_peer_request
*
370 drbd_alloc_peer_req(struct drbd_peer_device
*peer_device
, u64 id
, sector_t sector
,
371 unsigned int request_size
, unsigned int payload_size
, gfp_t gfp_mask
) __must_hold(local
)
373 struct drbd_device
*device
= peer_device
->device
;
374 struct drbd_peer_request
*peer_req
;
375 struct page
*page
= NULL
;
376 unsigned nr_pages
= (payload_size
+ PAGE_SIZE
-1) >> PAGE_SHIFT
;
378 if (drbd_insert_fault(device
, DRBD_FAULT_AL_EE
))
381 peer_req
= mempool_alloc(&drbd_ee_mempool
, gfp_mask
& ~__GFP_HIGHMEM
);
383 if (!(gfp_mask
& __GFP_NOWARN
))
384 drbd_err(device
, "%s: allocation failed\n", __func__
);
389 page
= drbd_alloc_pages(peer_device
, nr_pages
,
390 gfpflags_allow_blocking(gfp_mask
));
395 memset(peer_req
, 0, sizeof(*peer_req
));
396 INIT_LIST_HEAD(&peer_req
->w
.list
);
397 drbd_clear_interval(&peer_req
->i
);
398 peer_req
->i
.size
= request_size
;
399 peer_req
->i
.sector
= sector
;
400 peer_req
->submit_jif
= jiffies
;
401 peer_req
->peer_device
= peer_device
;
402 peer_req
->pages
= page
;
404 * The block_id is opaque to the receiver. It is not endianness
405 * converted, and sent back to the sender unchanged.
407 peer_req
->block_id
= id
;
412 mempool_free(peer_req
, &drbd_ee_mempool
);
416 void __drbd_free_peer_req(struct drbd_device
*device
, struct drbd_peer_request
*peer_req
,
420 if (peer_req
->flags
& EE_HAS_DIGEST
)
421 kfree(peer_req
->digest
);
422 drbd_free_pages(device
, peer_req
->pages
, is_net
);
423 D_ASSERT(device
, atomic_read(&peer_req
->pending_bios
) == 0);
424 D_ASSERT(device
, drbd_interval_empty(&peer_req
->i
));
425 if (!expect(!(peer_req
->flags
& EE_CALL_AL_COMPLETE_IO
))) {
426 peer_req
->flags
&= ~EE_CALL_AL_COMPLETE_IO
;
427 drbd_al_complete_io(device
, &peer_req
->i
);
429 mempool_free(peer_req
, &drbd_ee_mempool
);
432 int drbd_free_peer_reqs(struct drbd_device
*device
, struct list_head
*list
)
434 LIST_HEAD(work_list
);
435 struct drbd_peer_request
*peer_req
, *t
;
437 int is_net
= list
== &device
->net_ee
;
439 spin_lock_irq(&device
->resource
->req_lock
);
440 list_splice_init(list
, &work_list
);
441 spin_unlock_irq(&device
->resource
->req_lock
);
443 list_for_each_entry_safe(peer_req
, t
, &work_list
, w
.list
) {
444 __drbd_free_peer_req(device
, peer_req
, is_net
);
451 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
453 static int drbd_finish_peer_reqs(struct drbd_device
*device
)
455 LIST_HEAD(work_list
);
456 LIST_HEAD(reclaimed
);
457 struct drbd_peer_request
*peer_req
, *t
;
460 spin_lock_irq(&device
->resource
->req_lock
);
461 reclaim_finished_net_peer_reqs(device
, &reclaimed
);
462 list_splice_init(&device
->done_ee
, &work_list
);
463 spin_unlock_irq(&device
->resource
->req_lock
);
465 list_for_each_entry_safe(peer_req
, t
, &reclaimed
, w
.list
)
466 drbd_free_net_peer_req(device
, peer_req
);
468 /* possible callbacks here:
469 * e_end_block, and e_end_resync_block, e_send_superseded.
470 * all ignore the last argument.
472 list_for_each_entry_safe(peer_req
, t
, &work_list
, w
.list
) {
475 /* list_del not necessary, next/prev members not touched */
476 err2
= peer_req
->w
.cb(&peer_req
->w
, !!err
);
479 drbd_free_peer_req(device
, peer_req
);
481 wake_up(&device
->ee_wait
);
486 static void _drbd_wait_ee_list_empty(struct drbd_device
*device
,
487 struct list_head
*head
)
491 /* avoids spin_lock/unlock
492 * and calling prepare_to_wait in the fast path */
493 while (!list_empty(head
)) {
494 prepare_to_wait(&device
->ee_wait
, &wait
, TASK_UNINTERRUPTIBLE
);
495 spin_unlock_irq(&device
->resource
->req_lock
);
497 finish_wait(&device
->ee_wait
, &wait
);
498 spin_lock_irq(&device
->resource
->req_lock
);
502 static void drbd_wait_ee_list_empty(struct drbd_device
*device
,
503 struct list_head
*head
)
505 spin_lock_irq(&device
->resource
->req_lock
);
506 _drbd_wait_ee_list_empty(device
, head
);
507 spin_unlock_irq(&device
->resource
->req_lock
);
510 static int drbd_recv_short(struct socket
*sock
, void *buf
, size_t size
, int flags
)
516 struct msghdr msg
= {
517 .msg_flags
= (flags
? flags
: MSG_WAITALL
| MSG_NOSIGNAL
)
519 iov_iter_kvec(&msg
.msg_iter
, READ
, &iov
, 1, size
);
520 return sock_recvmsg(sock
, &msg
, msg
.msg_flags
);
523 static int drbd_recv(struct drbd_connection
*connection
, void *buf
, size_t size
)
527 rv
= drbd_recv_short(connection
->data
.socket
, buf
, size
, 0);
530 if (rv
== -ECONNRESET
)
531 drbd_info(connection
, "sock was reset by peer\n");
532 else if (rv
!= -ERESTARTSYS
)
533 drbd_err(connection
, "sock_recvmsg returned %d\n", rv
);
534 } else if (rv
== 0) {
535 if (test_bit(DISCONNECT_SENT
, &connection
->flags
)) {
538 t
= rcu_dereference(connection
->net_conf
)->ping_timeo
* HZ
/10;
541 t
= wait_event_timeout(connection
->ping_wait
, connection
->cstate
< C_WF_REPORT_PARAMS
, t
);
546 drbd_info(connection
, "sock was shut down by peer\n");
550 conn_request_state(connection
, NS(conn
, C_BROKEN_PIPE
), CS_HARD
);
556 static int drbd_recv_all(struct drbd_connection
*connection
, void *buf
, size_t size
)
560 err
= drbd_recv(connection
, buf
, size
);
569 static int drbd_recv_all_warn(struct drbd_connection
*connection
, void *buf
, size_t size
)
573 err
= drbd_recv_all(connection
, buf
, size
);
574 if (err
&& !signal_pending(current
))
575 drbd_warn(connection
, "short read (expected size %d)\n", (int)size
);
580 * On individual connections, the socket buffer size must be set prior to the
581 * listen(2) or connect(2) calls in order to have it take effect.
582 * This is our wrapper to do so.
584 static void drbd_setbufsize(struct socket
*sock
, unsigned int snd
,
587 /* open coded SO_SNDBUF, SO_RCVBUF */
589 sock
->sk
->sk_sndbuf
= snd
;
590 sock
->sk
->sk_userlocks
|= SOCK_SNDBUF_LOCK
;
593 sock
->sk
->sk_rcvbuf
= rcv
;
594 sock
->sk
->sk_userlocks
|= SOCK_RCVBUF_LOCK
;
598 static struct socket
*drbd_try_connect(struct drbd_connection
*connection
)
602 struct sockaddr_in6 src_in6
;
603 struct sockaddr_in6 peer_in6
;
605 int err
, peer_addr_len
, my_addr_len
;
606 int sndbuf_size
, rcvbuf_size
, connect_int
;
607 int disconnect_on_error
= 1;
610 nc
= rcu_dereference(connection
->net_conf
);
615 sndbuf_size
= nc
->sndbuf_size
;
616 rcvbuf_size
= nc
->rcvbuf_size
;
617 connect_int
= nc
->connect_int
;
620 my_addr_len
= min_t(int, connection
->my_addr_len
, sizeof(src_in6
));
621 memcpy(&src_in6
, &connection
->my_addr
, my_addr_len
);
623 if (((struct sockaddr
*)&connection
->my_addr
)->sa_family
== AF_INET6
)
624 src_in6
.sin6_port
= 0;
626 ((struct sockaddr_in
*)&src_in6
)->sin_port
= 0; /* AF_INET & AF_SCI */
628 peer_addr_len
= min_t(int, connection
->peer_addr_len
, sizeof(src_in6
));
629 memcpy(&peer_in6
, &connection
->peer_addr
, peer_addr_len
);
631 what
= "sock_create_kern";
632 err
= sock_create_kern(&init_net
, ((struct sockaddr
*)&src_in6
)->sa_family
,
633 SOCK_STREAM
, IPPROTO_TCP
, &sock
);
639 sock
->sk
->sk_rcvtimeo
=
640 sock
->sk
->sk_sndtimeo
= connect_int
* HZ
;
641 drbd_setbufsize(sock
, sndbuf_size
, rcvbuf_size
);
643 /* explicitly bind to the configured IP as source IP
644 * for the outgoing connections.
645 * This is needed for multihomed hosts and to be
646 * able to use lo: interfaces for drbd.
647 * Make sure to use 0 as port number, so linux selects
648 * a free one dynamically.
650 what
= "bind before connect";
651 err
= sock
->ops
->bind(sock
, (struct sockaddr
*) &src_in6
, my_addr_len
);
655 /* connect may fail, peer not yet available.
656 * stay C_WF_CONNECTION, don't go Disconnecting! */
657 disconnect_on_error
= 0;
659 err
= sock
->ops
->connect(sock
, (struct sockaddr
*) &peer_in6
, peer_addr_len
, 0);
668 /* timeout, busy, signal pending */
669 case ETIMEDOUT
: case EAGAIN
: case EINPROGRESS
:
670 case EINTR
: case ERESTARTSYS
:
671 /* peer not (yet) available, network problem */
672 case ECONNREFUSED
: case ENETUNREACH
:
673 case EHOSTDOWN
: case EHOSTUNREACH
:
674 disconnect_on_error
= 0;
677 drbd_err(connection
, "%s failed, err = %d\n", what
, err
);
679 if (disconnect_on_error
)
680 conn_request_state(connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
686 struct accept_wait_data
{
687 struct drbd_connection
*connection
;
688 struct socket
*s_listen
;
689 struct completion door_bell
;
690 void (*original_sk_state_change
)(struct sock
*sk
);
694 static void drbd_incoming_connection(struct sock
*sk
)
696 struct accept_wait_data
*ad
= sk
->sk_user_data
;
697 void (*state_change
)(struct sock
*sk
);
699 state_change
= ad
->original_sk_state_change
;
700 if (sk
->sk_state
== TCP_ESTABLISHED
)
701 complete(&ad
->door_bell
);
705 static int prepare_listen_socket(struct drbd_connection
*connection
, struct accept_wait_data
*ad
)
707 int err
, sndbuf_size
, rcvbuf_size
, my_addr_len
;
708 struct sockaddr_in6 my_addr
;
709 struct socket
*s_listen
;
714 nc
= rcu_dereference(connection
->net_conf
);
719 sndbuf_size
= nc
->sndbuf_size
;
720 rcvbuf_size
= nc
->rcvbuf_size
;
723 my_addr_len
= min_t(int, connection
->my_addr_len
, sizeof(struct sockaddr_in6
));
724 memcpy(&my_addr
, &connection
->my_addr
, my_addr_len
);
726 what
= "sock_create_kern";
727 err
= sock_create_kern(&init_net
, ((struct sockaddr
*)&my_addr
)->sa_family
,
728 SOCK_STREAM
, IPPROTO_TCP
, &s_listen
);
734 s_listen
->sk
->sk_reuse
= SK_CAN_REUSE
; /* SO_REUSEADDR */
735 drbd_setbufsize(s_listen
, sndbuf_size
, rcvbuf_size
);
737 what
= "bind before listen";
738 err
= s_listen
->ops
->bind(s_listen
, (struct sockaddr
*)&my_addr
, my_addr_len
);
742 ad
->s_listen
= s_listen
;
743 write_lock_bh(&s_listen
->sk
->sk_callback_lock
);
744 ad
->original_sk_state_change
= s_listen
->sk
->sk_state_change
;
745 s_listen
->sk
->sk_state_change
= drbd_incoming_connection
;
746 s_listen
->sk
->sk_user_data
= ad
;
747 write_unlock_bh(&s_listen
->sk
->sk_callback_lock
);
750 err
= s_listen
->ops
->listen(s_listen
, 5);
757 sock_release(s_listen
);
759 if (err
!= -EAGAIN
&& err
!= -EINTR
&& err
!= -ERESTARTSYS
) {
760 drbd_err(connection
, "%s failed, err = %d\n", what
, err
);
761 conn_request_state(connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
768 static void unregister_state_change(struct sock
*sk
, struct accept_wait_data
*ad
)
770 write_lock_bh(&sk
->sk_callback_lock
);
771 sk
->sk_state_change
= ad
->original_sk_state_change
;
772 sk
->sk_user_data
= NULL
;
773 write_unlock_bh(&sk
->sk_callback_lock
);
776 static struct socket
*drbd_wait_for_connect(struct drbd_connection
*connection
, struct accept_wait_data
*ad
)
778 int timeo
, connect_int
, err
= 0;
779 struct socket
*s_estab
= NULL
;
783 nc
= rcu_dereference(connection
->net_conf
);
788 connect_int
= nc
->connect_int
;
791 timeo
= connect_int
* HZ
;
792 /* 28.5% random jitter */
793 timeo
+= (prandom_u32() & 1) ? timeo
/ 7 : -timeo
/ 7;
795 err
= wait_for_completion_interruptible_timeout(&ad
->door_bell
, timeo
);
799 err
= kernel_accept(ad
->s_listen
, &s_estab
, 0);
801 if (err
!= -EAGAIN
&& err
!= -EINTR
&& err
!= -ERESTARTSYS
) {
802 drbd_err(connection
, "accept failed, err = %d\n", err
);
803 conn_request_state(connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
808 unregister_state_change(s_estab
->sk
, ad
);
813 static int decode_header(struct drbd_connection
*, void *, struct packet_info
*);
815 static int send_first_packet(struct drbd_connection
*connection
, struct drbd_socket
*sock
,
816 enum drbd_packet cmd
)
818 if (!conn_prepare_command(connection
, sock
))
820 return conn_send_command(connection
, sock
, cmd
, 0, NULL
, 0);
823 static int receive_first_packet(struct drbd_connection
*connection
, struct socket
*sock
)
825 unsigned int header_size
= drbd_header_size(connection
);
826 struct packet_info pi
;
831 nc
= rcu_dereference(connection
->net_conf
);
836 sock
->sk
->sk_rcvtimeo
= nc
->ping_timeo
* 4 * HZ
/ 10;
839 err
= drbd_recv_short(sock
, connection
->data
.rbuf
, header_size
, 0);
840 if (err
!= header_size
) {
845 err
= decode_header(connection
, connection
->data
.rbuf
, &pi
);
852 * drbd_socket_okay() - Free the socket if its connection is not okay
853 * @sock: pointer to the pointer to the socket.
855 static bool drbd_socket_okay(struct socket
**sock
)
863 rr
= drbd_recv_short(*sock
, tb
, 4, MSG_DONTWAIT
| MSG_PEEK
);
865 if (rr
> 0 || rr
== -EAGAIN
) {
874 static bool connection_established(struct drbd_connection
*connection
,
875 struct socket
**sock1
,
876 struct socket
**sock2
)
882 if (!*sock1
|| !*sock2
)
886 nc
= rcu_dereference(connection
->net_conf
);
887 timeout
= (nc
->sock_check_timeo
?: nc
->ping_timeo
) * HZ
/ 10;
889 schedule_timeout_interruptible(timeout
);
891 ok
= drbd_socket_okay(sock1
);
892 ok
= drbd_socket_okay(sock2
) && ok
;
897 /* Gets called if a connection is established, or if a new minor gets created
899 int drbd_connected(struct drbd_peer_device
*peer_device
)
901 struct drbd_device
*device
= peer_device
->device
;
904 atomic_set(&device
->packet_seq
, 0);
905 device
->peer_seq
= 0;
907 device
->state_mutex
= peer_device
->connection
->agreed_pro_version
< 100 ?
908 &peer_device
->connection
->cstate_mutex
:
909 &device
->own_state_mutex
;
911 err
= drbd_send_sync_param(peer_device
);
913 err
= drbd_send_sizes(peer_device
, 0, 0);
915 err
= drbd_send_uuids(peer_device
);
917 err
= drbd_send_current_state(peer_device
);
918 clear_bit(USE_DEGR_WFC_T
, &device
->flags
);
919 clear_bit(RESIZE_PENDING
, &device
->flags
);
920 atomic_set(&device
->ap_in_flight
, 0);
921 mod_timer(&device
->request_timer
, jiffies
+ HZ
); /* just start it here. */
927 * 1 yes, we have a valid connection
928 * 0 oops, did not work out, please try again
929 * -1 peer talks different language,
930 * no point in trying again, please go standalone.
931 * -2 We do not have a network config...
933 static int conn_connect(struct drbd_connection
*connection
)
935 struct drbd_socket sock
, msock
;
936 struct drbd_peer_device
*peer_device
;
939 bool discard_my_data
, ok
;
940 enum drbd_state_rv rv
;
941 struct accept_wait_data ad
= {
942 .connection
= connection
,
943 .door_bell
= COMPLETION_INITIALIZER_ONSTACK(ad
.door_bell
),
946 clear_bit(DISCONNECT_SENT
, &connection
->flags
);
947 if (conn_request_state(connection
, NS(conn
, C_WF_CONNECTION
), CS_VERBOSE
) < SS_SUCCESS
)
950 mutex_init(&sock
.mutex
);
951 sock
.sbuf
= connection
->data
.sbuf
;
952 sock
.rbuf
= connection
->data
.rbuf
;
954 mutex_init(&msock
.mutex
);
955 msock
.sbuf
= connection
->meta
.sbuf
;
956 msock
.rbuf
= connection
->meta
.rbuf
;
959 /* Assume that the peer only understands protocol 80 until we know better. */
960 connection
->agreed_pro_version
= 80;
962 if (prepare_listen_socket(connection
, &ad
))
968 s
= drbd_try_connect(connection
);
972 send_first_packet(connection
, &sock
, P_INITIAL_DATA
);
973 } else if (!msock
.socket
) {
974 clear_bit(RESOLVE_CONFLICTS
, &connection
->flags
);
976 send_first_packet(connection
, &msock
, P_INITIAL_META
);
978 drbd_err(connection
, "Logic error in conn_connect()\n");
979 goto out_release_sockets
;
983 if (connection_established(connection
, &sock
.socket
, &msock
.socket
))
987 s
= drbd_wait_for_connect(connection
, &ad
);
989 int fp
= receive_first_packet(connection
, s
);
990 drbd_socket_okay(&sock
.socket
);
991 drbd_socket_okay(&msock
.socket
);
995 drbd_warn(connection
, "initial packet S crossed\n");
996 sock_release(sock
.socket
);
1002 case P_INITIAL_META
:
1003 set_bit(RESOLVE_CONFLICTS
, &connection
->flags
);
1005 drbd_warn(connection
, "initial packet M crossed\n");
1006 sock_release(msock
.socket
);
1013 drbd_warn(connection
, "Error receiving initial packet\n");
1016 if (prandom_u32() & 1)
1021 if (connection
->cstate
<= C_DISCONNECTING
)
1022 goto out_release_sockets
;
1023 if (signal_pending(current
)) {
1024 flush_signals(current
);
1026 if (get_t_state(&connection
->receiver
) == EXITING
)
1027 goto out_release_sockets
;
1030 ok
= connection_established(connection
, &sock
.socket
, &msock
.socket
);
1034 sock_release(ad
.s_listen
);
1036 sock
.socket
->sk
->sk_reuse
= SK_CAN_REUSE
; /* SO_REUSEADDR */
1037 msock
.socket
->sk
->sk_reuse
= SK_CAN_REUSE
; /* SO_REUSEADDR */
1039 sock
.socket
->sk
->sk_allocation
= GFP_NOIO
;
1040 msock
.socket
->sk
->sk_allocation
= GFP_NOIO
;
1042 sock
.socket
->sk
->sk_priority
= TC_PRIO_INTERACTIVE_BULK
;
1043 msock
.socket
->sk
->sk_priority
= TC_PRIO_INTERACTIVE
;
1046 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1047 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1048 * first set it to the P_CONNECTION_FEATURES timeout,
1049 * which we set to 4x the configured ping_timeout. */
1051 nc
= rcu_dereference(connection
->net_conf
);
1053 sock
.socket
->sk
->sk_sndtimeo
=
1054 sock
.socket
->sk
->sk_rcvtimeo
= nc
->ping_timeo
*4*HZ
/10;
1056 msock
.socket
->sk
->sk_rcvtimeo
= nc
->ping_int
*HZ
;
1057 timeout
= nc
->timeout
* HZ
/ 10;
1058 discard_my_data
= nc
->discard_my_data
;
1061 msock
.socket
->sk
->sk_sndtimeo
= timeout
;
1063 /* we don't want delays.
1064 * we use TCP_CORK where appropriate, though */
1065 drbd_tcp_nodelay(sock
.socket
);
1066 drbd_tcp_nodelay(msock
.socket
);
1068 connection
->data
.socket
= sock
.socket
;
1069 connection
->meta
.socket
= msock
.socket
;
1070 connection
->last_received
= jiffies
;
1072 h
= drbd_do_features(connection
);
1076 if (connection
->cram_hmac_tfm
) {
1077 /* drbd_request_state(device, NS(conn, WFAuth)); */
1078 switch (drbd_do_auth(connection
)) {
1080 drbd_err(connection
, "Authentication of peer failed\n");
1083 drbd_err(connection
, "Authentication of peer failed, trying again.\n");
1088 connection
->data
.socket
->sk
->sk_sndtimeo
= timeout
;
1089 connection
->data
.socket
->sk
->sk_rcvtimeo
= MAX_SCHEDULE_TIMEOUT
;
1091 if (drbd_send_protocol(connection
) == -EOPNOTSUPP
)
1094 /* Prevent a race between resync-handshake and
1095 * being promoted to Primary.
1097 * Grab and release the state mutex, so we know that any current
1098 * drbd_set_role() is finished, and any incoming drbd_set_role
1099 * will see the STATE_SENT flag, and wait for it to be cleared.
1101 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
)
1102 mutex_lock(peer_device
->device
->state_mutex
);
1104 /* avoid a race with conn_request_state( C_DISCONNECTING ) */
1105 spin_lock_irq(&connection
->resource
->req_lock
);
1106 set_bit(STATE_SENT
, &connection
->flags
);
1107 spin_unlock_irq(&connection
->resource
->req_lock
);
1109 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
)
1110 mutex_unlock(peer_device
->device
->state_mutex
);
1113 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
) {
1114 struct drbd_device
*device
= peer_device
->device
;
1115 kref_get(&device
->kref
);
1118 if (discard_my_data
)
1119 set_bit(DISCARD_MY_DATA
, &device
->flags
);
1121 clear_bit(DISCARD_MY_DATA
, &device
->flags
);
1123 drbd_connected(peer_device
);
1124 kref_put(&device
->kref
, drbd_destroy_device
);
1129 rv
= conn_request_state(connection
, NS(conn
, C_WF_REPORT_PARAMS
), CS_VERBOSE
);
1130 if (rv
< SS_SUCCESS
|| connection
->cstate
!= C_WF_REPORT_PARAMS
) {
1131 clear_bit(STATE_SENT
, &connection
->flags
);
1135 drbd_thread_start(&connection
->ack_receiver
);
1136 /* opencoded create_singlethread_workqueue(),
1137 * to be able to use format string arguments */
1138 connection
->ack_sender
=
1139 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM
, connection
->resource
->name
);
1140 if (!connection
->ack_sender
) {
1141 drbd_err(connection
, "Failed to create workqueue ack_sender\n");
1145 mutex_lock(&connection
->resource
->conf_update
);
1146 /* The discard_my_data flag is a single-shot modifier to the next
1147 * connection attempt, the handshake of which is now well underway.
1148 * No need for rcu style copying of the whole struct
1149 * just to clear a single value. */
1150 connection
->net_conf
->discard_my_data
= 0;
1151 mutex_unlock(&connection
->resource
->conf_update
);
1155 out_release_sockets
:
1157 sock_release(ad
.s_listen
);
1159 sock_release(sock
.socket
);
1161 sock_release(msock
.socket
);
1165 static int decode_header(struct drbd_connection
*connection
, void *header
, struct packet_info
*pi
)
1167 unsigned int header_size
= drbd_header_size(connection
);
1169 if (header_size
== sizeof(struct p_header100
) &&
1170 *(__be32
*)header
== cpu_to_be32(DRBD_MAGIC_100
)) {
1171 struct p_header100
*h
= header
;
1173 drbd_err(connection
, "Header padding is not zero\n");
1176 pi
->vnr
= be16_to_cpu(h
->volume
);
1177 pi
->cmd
= be16_to_cpu(h
->command
);
1178 pi
->size
= be32_to_cpu(h
->length
);
1179 } else if (header_size
== sizeof(struct p_header95
) &&
1180 *(__be16
*)header
== cpu_to_be16(DRBD_MAGIC_BIG
)) {
1181 struct p_header95
*h
= header
;
1182 pi
->cmd
= be16_to_cpu(h
->command
);
1183 pi
->size
= be32_to_cpu(h
->length
);
1185 } else if (header_size
== sizeof(struct p_header80
) &&
1186 *(__be32
*)header
== cpu_to_be32(DRBD_MAGIC
)) {
1187 struct p_header80
*h
= header
;
1188 pi
->cmd
= be16_to_cpu(h
->command
);
1189 pi
->size
= be16_to_cpu(h
->length
);
1192 drbd_err(connection
, "Wrong magic value 0x%08x in protocol version %d\n",
1193 be32_to_cpu(*(__be32
*)header
),
1194 connection
->agreed_pro_version
);
1197 pi
->data
= header
+ header_size
;
1201 static void drbd_unplug_all_devices(struct drbd_connection
*connection
)
1203 if (current
->plug
== &connection
->receiver_plug
) {
1204 blk_finish_plug(&connection
->receiver_plug
);
1205 blk_start_plug(&connection
->receiver_plug
);
1206 } /* else: maybe just schedule() ?? */
1209 static int drbd_recv_header(struct drbd_connection
*connection
, struct packet_info
*pi
)
1211 void *buffer
= connection
->data
.rbuf
;
1214 err
= drbd_recv_all_warn(connection
, buffer
, drbd_header_size(connection
));
1218 err
= decode_header(connection
, buffer
, pi
);
1219 connection
->last_received
= jiffies
;
1224 static int drbd_recv_header_maybe_unplug(struct drbd_connection
*connection
, struct packet_info
*pi
)
1226 void *buffer
= connection
->data
.rbuf
;
1227 unsigned int size
= drbd_header_size(connection
);
1230 err
= drbd_recv_short(connection
->data
.socket
, buffer
, size
, MSG_NOSIGNAL
|MSG_DONTWAIT
);
1232 /* If we have nothing in the receive buffer now, to reduce
1233 * application latency, try to drain the backend queues as
1234 * quickly as possible, and let remote TCP know what we have
1235 * received so far. */
1236 if (err
== -EAGAIN
) {
1237 drbd_tcp_quickack(connection
->data
.socket
);
1238 drbd_unplug_all_devices(connection
);
1244 err
= drbd_recv_all_warn(connection
, buffer
, size
);
1249 err
= decode_header(connection
, connection
->data
.rbuf
, pi
);
1250 connection
->last_received
= jiffies
;
1254 /* This is blkdev_issue_flush, but asynchronous.
1255 * We want to submit to all component volumes in parallel,
1256 * then wait for all completions.
1258 struct issue_flush_context
{
1261 struct completion done
;
1263 struct one_flush_context
{
1264 struct drbd_device
*device
;
1265 struct issue_flush_context
*ctx
;
1268 static void one_flush_endio(struct bio
*bio
)
1270 struct one_flush_context
*octx
= bio
->bi_private
;
1271 struct drbd_device
*device
= octx
->device
;
1272 struct issue_flush_context
*ctx
= octx
->ctx
;
1274 if (bio
->bi_status
) {
1275 ctx
->error
= blk_status_to_errno(bio
->bi_status
);
1276 drbd_info(device
, "local disk FLUSH FAILED with status %d\n", bio
->bi_status
);
1281 clear_bit(FLUSH_PENDING
, &device
->flags
);
1283 kref_put(&device
->kref
, drbd_destroy_device
);
1285 if (atomic_dec_and_test(&ctx
->pending
))
1286 complete(&ctx
->done
);
1289 static void submit_one_flush(struct drbd_device
*device
, struct issue_flush_context
*ctx
)
1291 struct bio
*bio
= bio_alloc(GFP_NOIO
, 0);
1292 struct one_flush_context
*octx
= kmalloc(sizeof(*octx
), GFP_NOIO
);
1293 if (!bio
|| !octx
) {
1294 drbd_warn(device
, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1295 /* FIXME: what else can I do now? disconnecting or detaching
1296 * really does not help to improve the state of the world, either.
1302 ctx
->error
= -ENOMEM
;
1304 kref_put(&device
->kref
, drbd_destroy_device
);
1308 octx
->device
= device
;
1310 bio_set_dev(bio
, device
->ldev
->backing_bdev
);
1311 bio
->bi_private
= octx
;
1312 bio
->bi_end_io
= one_flush_endio
;
1313 bio
->bi_opf
= REQ_OP_FLUSH
| REQ_PREFLUSH
;
1315 device
->flush_jif
= jiffies
;
1316 set_bit(FLUSH_PENDING
, &device
->flags
);
1317 atomic_inc(&ctx
->pending
);
1321 static void drbd_flush(struct drbd_connection
*connection
)
1323 if (connection
->resource
->write_ordering
>= WO_BDEV_FLUSH
) {
1324 struct drbd_peer_device
*peer_device
;
1325 struct issue_flush_context ctx
;
1328 atomic_set(&ctx
.pending
, 1);
1330 init_completion(&ctx
.done
);
1333 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
) {
1334 struct drbd_device
*device
= peer_device
->device
;
1336 if (!get_ldev(device
))
1338 kref_get(&device
->kref
);
1341 submit_one_flush(device
, &ctx
);
1347 /* Do we want to add a timeout,
1348 * if disk-timeout is set? */
1349 if (!atomic_dec_and_test(&ctx
.pending
))
1350 wait_for_completion(&ctx
.done
);
1353 /* would rather check on EOPNOTSUPP, but that is not reliable.
1354 * don't try again for ANY return value != 0
1355 * if (rv == -EOPNOTSUPP) */
1356 /* Any error is already reported by bio_endio callback. */
1357 drbd_bump_write_ordering(connection
->resource
, NULL
, WO_DRAIN_IO
);
1363 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1364 * @device: DRBD device.
1365 * @epoch: Epoch object.
1368 static enum finish_epoch
drbd_may_finish_epoch(struct drbd_connection
*connection
,
1369 struct drbd_epoch
*epoch
,
1370 enum epoch_event ev
)
1373 struct drbd_epoch
*next_epoch
;
1374 enum finish_epoch rv
= FE_STILL_LIVE
;
1376 spin_lock(&connection
->epoch_lock
);
1380 epoch_size
= atomic_read(&epoch
->epoch_size
);
1382 switch (ev
& ~EV_CLEANUP
) {
1384 atomic_dec(&epoch
->active
);
1386 case EV_GOT_BARRIER_NR
:
1387 set_bit(DE_HAVE_BARRIER_NUMBER
, &epoch
->flags
);
1389 case EV_BECAME_LAST
:
1394 if (epoch_size
!= 0 &&
1395 atomic_read(&epoch
->active
) == 0 &&
1396 (test_bit(DE_HAVE_BARRIER_NUMBER
, &epoch
->flags
) || ev
& EV_CLEANUP
)) {
1397 if (!(ev
& EV_CLEANUP
)) {
1398 spin_unlock(&connection
->epoch_lock
);
1399 drbd_send_b_ack(epoch
->connection
, epoch
->barrier_nr
, epoch_size
);
1400 spin_lock(&connection
->epoch_lock
);
1403 /* FIXME: dec unacked on connection, once we have
1404 * something to count pending connection packets in. */
1405 if (test_bit(DE_HAVE_BARRIER_NUMBER
, &epoch
->flags
))
1406 dec_unacked(epoch
->connection
);
1409 if (connection
->current_epoch
!= epoch
) {
1410 next_epoch
= list_entry(epoch
->list
.next
, struct drbd_epoch
, list
);
1411 list_del(&epoch
->list
);
1412 ev
= EV_BECAME_LAST
| (ev
& EV_CLEANUP
);
1413 connection
->epochs
--;
1416 if (rv
== FE_STILL_LIVE
)
1420 atomic_set(&epoch
->epoch_size
, 0);
1421 /* atomic_set(&epoch->active, 0); is already zero */
1422 if (rv
== FE_STILL_LIVE
)
1433 spin_unlock(&connection
->epoch_lock
);
1438 static enum write_ordering_e
1439 max_allowed_wo(struct drbd_backing_dev
*bdev
, enum write_ordering_e wo
)
1441 struct disk_conf
*dc
;
1443 dc
= rcu_dereference(bdev
->disk_conf
);
1445 if (wo
== WO_BDEV_FLUSH
&& !dc
->disk_flushes
)
1447 if (wo
== WO_DRAIN_IO
&& !dc
->disk_drain
)
1454 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1455 * @connection: DRBD connection.
1456 * @wo: Write ordering method to try.
1458 void drbd_bump_write_ordering(struct drbd_resource
*resource
, struct drbd_backing_dev
*bdev
,
1459 enum write_ordering_e wo
)
1461 struct drbd_device
*device
;
1462 enum write_ordering_e pwo
;
1464 static char *write_ordering_str
[] = {
1466 [WO_DRAIN_IO
] = "drain",
1467 [WO_BDEV_FLUSH
] = "flush",
1470 pwo
= resource
->write_ordering
;
1471 if (wo
!= WO_BDEV_FLUSH
)
1474 idr_for_each_entry(&resource
->devices
, device
, vnr
) {
1475 if (get_ldev(device
)) {
1476 wo
= max_allowed_wo(device
->ldev
, wo
);
1477 if (device
->ldev
== bdev
)
1484 wo
= max_allowed_wo(bdev
, wo
);
1488 resource
->write_ordering
= wo
;
1489 if (pwo
!= resource
->write_ordering
|| wo
== WO_BDEV_FLUSH
)
1490 drbd_info(resource
, "Method to ensure write ordering: %s\n", write_ordering_str
[resource
->write_ordering
]);
1494 * Mapping "discard" to ZEROOUT with UNMAP does not work for us:
1495 * Drivers have to "announce" q->limits.max_write_zeroes_sectors, or it
1496 * will directly go to fallback mode, submitting normal writes, and
1497 * never even try to UNMAP.
1499 * And dm-thin does not do this (yet), mostly because in general it has
1500 * to assume that "skip_block_zeroing" is set. See also:
1501 * https://www.mail-archive.com/dm-devel%40redhat.com/msg07965.html
1502 * https://www.redhat.com/archives/dm-devel/2018-January/msg00271.html
1504 * We *may* ignore the discard-zeroes-data setting, if so configured.
1506 * Assumption is that this "discard_zeroes_data=0" is only because the backend
1507 * may ignore partial unaligned discards.
1509 * LVM/DM thin as of at least
1510 * LVM version: 2.02.115(2)-RHEL7 (2015-01-28)
1511 * Library version: 1.02.93-RHEL7 (2015-01-28)
1512 * Driver version: 4.29.0
1513 * still behaves this way.
1515 * For unaligned (wrt. alignment and granularity) or too small discards,
1516 * we zero-out the initial (and/or) trailing unaligned partial chunks,
1517 * but discard all the aligned full chunks.
1519 * At least for LVM/DM thin, with skip_block_zeroing=false,
1520 * the result is effectively "discard_zeroes_data=1".
1522 /* flags: EE_TRIM|EE_ZEROOUT */
1523 int drbd_issue_discard_or_zero_out(struct drbd_device
*device
, sector_t start
, unsigned int nr_sectors
, int flags
)
1525 struct block_device
*bdev
= device
->ldev
->backing_bdev
;
1526 struct request_queue
*q
= bdev_get_queue(bdev
);
1528 unsigned int max_discard_sectors
, granularity
;
1532 if ((flags
& EE_ZEROOUT
) || !(flags
& EE_TRIM
))
1535 /* Zero-sector (unknown) and one-sector granularities are the same. */
1536 granularity
= max(q
->limits
.discard_granularity
>> 9, 1U);
1537 alignment
= (bdev_discard_alignment(bdev
) >> 9) % granularity
;
1539 max_discard_sectors
= min(q
->limits
.max_discard_sectors
, (1U << 22));
1540 max_discard_sectors
-= max_discard_sectors
% granularity
;
1541 if (unlikely(!max_discard_sectors
))
1544 if (nr_sectors
< granularity
)
1548 if (sector_div(tmp
, granularity
) != alignment
) {
1549 if (nr_sectors
< 2*granularity
)
1551 /* start + gran - (start + gran - align) % gran */
1552 tmp
= start
+ granularity
- alignment
;
1553 tmp
= start
+ granularity
- sector_div(tmp
, granularity
);
1556 /* don't flag BLKDEV_ZERO_NOUNMAP, we don't know how many
1557 * layers are below us, some may have smaller granularity */
1558 err
|= blkdev_issue_zeroout(bdev
, start
, nr
, GFP_NOIO
, 0);
1562 while (nr_sectors
>= max_discard_sectors
) {
1563 err
|= blkdev_issue_discard(bdev
, start
, max_discard_sectors
, GFP_NOIO
, 0);
1564 nr_sectors
-= max_discard_sectors
;
1565 start
+= max_discard_sectors
;
1568 /* max_discard_sectors is unsigned int (and a multiple of
1569 * granularity, we made sure of that above already);
1570 * nr is < max_discard_sectors;
1571 * I don't need sector_div here, even though nr is sector_t */
1573 nr
-= (unsigned int)nr
% granularity
;
1575 err
|= blkdev_issue_discard(bdev
, start
, nr
, GFP_NOIO
, 0);
1582 err
|= blkdev_issue_zeroout(bdev
, start
, nr_sectors
, GFP_NOIO
,
1583 (flags
& EE_TRIM
) ? 0 : BLKDEV_ZERO_NOUNMAP
);
1588 static bool can_do_reliable_discards(struct drbd_device
*device
)
1590 struct request_queue
*q
= bdev_get_queue(device
->ldev
->backing_bdev
);
1591 struct disk_conf
*dc
;
1594 if (!blk_queue_discard(q
))
1598 dc
= rcu_dereference(device
->ldev
->disk_conf
);
1599 can_do
= dc
->discard_zeroes_if_aligned
;
1604 static void drbd_issue_peer_discard_or_zero_out(struct drbd_device
*device
, struct drbd_peer_request
*peer_req
)
1606 /* If the backend cannot discard, or does not guarantee
1607 * read-back zeroes in discarded ranges, we fall back to
1608 * zero-out. Unless configuration specifically requested
1610 if (!can_do_reliable_discards(device
))
1611 peer_req
->flags
|= EE_ZEROOUT
;
1613 if (drbd_issue_discard_or_zero_out(device
, peer_req
->i
.sector
,
1614 peer_req
->i
.size
>> 9, peer_req
->flags
& (EE_ZEROOUT
|EE_TRIM
)))
1615 peer_req
->flags
|= EE_WAS_ERROR
;
1616 drbd_endio_write_sec_final(peer_req
);
1619 static void drbd_issue_peer_wsame(struct drbd_device
*device
,
1620 struct drbd_peer_request
*peer_req
)
1622 struct block_device
*bdev
= device
->ldev
->backing_bdev
;
1623 sector_t s
= peer_req
->i
.sector
;
1624 sector_t nr
= peer_req
->i
.size
>> 9;
1625 if (blkdev_issue_write_same(bdev
, s
, nr
, GFP_NOIO
, peer_req
->pages
))
1626 peer_req
->flags
|= EE_WAS_ERROR
;
1627 drbd_endio_write_sec_final(peer_req
);
1632 * drbd_submit_peer_request()
1633 * @device: DRBD device.
1634 * @peer_req: peer request
1635 * @rw: flag field, see bio->bi_opf
1637 * May spread the pages to multiple bios,
1638 * depending on bio_add_page restrictions.
1640 * Returns 0 if all bios have been submitted,
1641 * -ENOMEM if we could not allocate enough bios,
1642 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1643 * single page to an empty bio (which should never happen and likely indicates
1644 * that the lower level IO stack is in some way broken). This has been observed
1645 * on certain Xen deployments.
1647 /* TODO allocate from our own bio_set. */
1648 int drbd_submit_peer_request(struct drbd_device
*device
,
1649 struct drbd_peer_request
*peer_req
,
1650 const unsigned op
, const unsigned op_flags
,
1651 const int fault_type
)
1653 struct bio
*bios
= NULL
;
1655 struct page
*page
= peer_req
->pages
;
1656 sector_t sector
= peer_req
->i
.sector
;
1657 unsigned data_size
= peer_req
->i
.size
;
1658 unsigned n_bios
= 0;
1659 unsigned nr_pages
= (data_size
+ PAGE_SIZE
-1) >> PAGE_SHIFT
;
1662 /* TRIM/DISCARD: for now, always use the helper function
1663 * blkdev_issue_zeroout(..., discard=true).
1664 * It's synchronous, but it does the right thing wrt. bio splitting.
1665 * Correctness first, performance later. Next step is to code an
1666 * asynchronous variant of the same.
1668 if (peer_req
->flags
& (EE_TRIM
|EE_WRITE_SAME
|EE_ZEROOUT
)) {
1669 /* wait for all pending IO completions, before we start
1670 * zeroing things out. */
1671 conn_wait_active_ee_empty(peer_req
->peer_device
->connection
);
1672 /* add it to the active list now,
1673 * so we can find it to present it in debugfs */
1674 peer_req
->submit_jif
= jiffies
;
1675 peer_req
->flags
|= EE_SUBMITTED
;
1677 /* If this was a resync request from receive_rs_deallocated(),
1678 * it is already on the sync_ee list */
1679 if (list_empty(&peer_req
->w
.list
)) {
1680 spin_lock_irq(&device
->resource
->req_lock
);
1681 list_add_tail(&peer_req
->w
.list
, &device
->active_ee
);
1682 spin_unlock_irq(&device
->resource
->req_lock
);
1685 if (peer_req
->flags
& (EE_TRIM
|EE_ZEROOUT
))
1686 drbd_issue_peer_discard_or_zero_out(device
, peer_req
);
1687 else /* EE_WRITE_SAME */
1688 drbd_issue_peer_wsame(device
, peer_req
);
1692 /* In most cases, we will only need one bio. But in case the lower
1693 * level restrictions happen to be different at this offset on this
1694 * side than those of the sending peer, we may need to submit the
1695 * request in more than one bio.
1697 * Plain bio_alloc is good enough here, this is no DRBD internally
1698 * generated bio, but a bio allocated on behalf of the peer.
1701 bio
= bio_alloc(GFP_NOIO
, nr_pages
);
1703 drbd_err(device
, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages
);
1706 /* > peer_req->i.sector, unless this is the first bio */
1707 bio
->bi_iter
.bi_sector
= sector
;
1708 bio_set_dev(bio
, device
->ldev
->backing_bdev
);
1709 bio_set_op_attrs(bio
, op
, op_flags
);
1710 bio
->bi_private
= peer_req
;
1711 bio
->bi_end_io
= drbd_peer_request_endio
;
1713 bio
->bi_next
= bios
;
1717 page_chain_for_each(page
) {
1718 unsigned len
= min_t(unsigned, data_size
, PAGE_SIZE
);
1719 if (!bio_add_page(bio
, page
, len
, 0))
1725 D_ASSERT(device
, data_size
== 0);
1726 D_ASSERT(device
, page
== NULL
);
1728 atomic_set(&peer_req
->pending_bios
, n_bios
);
1729 /* for debugfs: update timestamp, mark as submitted */
1730 peer_req
->submit_jif
= jiffies
;
1731 peer_req
->flags
|= EE_SUBMITTED
;
1734 bios
= bios
->bi_next
;
1735 bio
->bi_next
= NULL
;
1737 drbd_generic_make_request(device
, fault_type
, bio
);
1744 bios
= bios
->bi_next
;
1750 static void drbd_remove_epoch_entry_interval(struct drbd_device
*device
,
1751 struct drbd_peer_request
*peer_req
)
1753 struct drbd_interval
*i
= &peer_req
->i
;
1755 drbd_remove_interval(&device
->write_requests
, i
);
1756 drbd_clear_interval(i
);
1758 /* Wake up any processes waiting for this peer request to complete. */
1760 wake_up(&device
->misc_wait
);
1763 static void conn_wait_active_ee_empty(struct drbd_connection
*connection
)
1765 struct drbd_peer_device
*peer_device
;
1769 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
) {
1770 struct drbd_device
*device
= peer_device
->device
;
1772 kref_get(&device
->kref
);
1774 drbd_wait_ee_list_empty(device
, &device
->active_ee
);
1775 kref_put(&device
->kref
, drbd_destroy_device
);
1781 static int receive_Barrier(struct drbd_connection
*connection
, struct packet_info
*pi
)
1784 struct p_barrier
*p
= pi
->data
;
1785 struct drbd_epoch
*epoch
;
1787 /* FIXME these are unacked on connection,
1788 * not a specific (peer)device.
1790 connection
->current_epoch
->barrier_nr
= p
->barrier
;
1791 connection
->current_epoch
->connection
= connection
;
1792 rv
= drbd_may_finish_epoch(connection
, connection
->current_epoch
, EV_GOT_BARRIER_NR
);
1794 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1795 * the activity log, which means it would not be resynced in case the
1796 * R_PRIMARY crashes now.
1797 * Therefore we must send the barrier_ack after the barrier request was
1799 switch (connection
->resource
->write_ordering
) {
1801 if (rv
== FE_RECYCLED
)
1804 /* receiver context, in the writeout path of the other node.
1805 * avoid potential distributed deadlock */
1806 epoch
= kmalloc(sizeof(struct drbd_epoch
), GFP_NOIO
);
1810 drbd_warn(connection
, "Allocation of an epoch failed, slowing down\n");
1815 conn_wait_active_ee_empty(connection
);
1816 drbd_flush(connection
);
1818 if (atomic_read(&connection
->current_epoch
->epoch_size
)) {
1819 epoch
= kmalloc(sizeof(struct drbd_epoch
), GFP_NOIO
);
1826 drbd_err(connection
, "Strangeness in connection->write_ordering %d\n",
1827 connection
->resource
->write_ordering
);
1832 atomic_set(&epoch
->epoch_size
, 0);
1833 atomic_set(&epoch
->active
, 0);
1835 spin_lock(&connection
->epoch_lock
);
1836 if (atomic_read(&connection
->current_epoch
->epoch_size
)) {
1837 list_add(&epoch
->list
, &connection
->current_epoch
->list
);
1838 connection
->current_epoch
= epoch
;
1839 connection
->epochs
++;
1841 /* The current_epoch got recycled while we allocated this one... */
1844 spin_unlock(&connection
->epoch_lock
);
1849 /* quick wrapper in case payload size != request_size (write same) */
1850 static void drbd_csum_ee_size(struct crypto_shash
*h
,
1851 struct drbd_peer_request
*r
, void *d
,
1852 unsigned int payload_size
)
1854 unsigned int tmp
= r
->i
.size
;
1855 r
->i
.size
= payload_size
;
1856 drbd_csum_ee(h
, r
, d
);
1860 /* used from receive_RSDataReply (recv_resync_read)
1861 * and from receive_Data.
1862 * data_size: actual payload ("data in")
1863 * for normal writes that is bi_size.
1864 * for discards, that is zero.
1865 * for write same, it is logical_block_size.
1866 * both trim and write same have the bi_size ("data len to be affected")
1867 * as extra argument in the packet header.
1869 static struct drbd_peer_request
*
1870 read_in_block(struct drbd_peer_device
*peer_device
, u64 id
, sector_t sector
,
1871 struct packet_info
*pi
) __must_hold(local
)
1873 struct drbd_device
*device
= peer_device
->device
;
1874 const sector_t capacity
= drbd_get_capacity(device
->this_bdev
);
1875 struct drbd_peer_request
*peer_req
;
1877 int digest_size
, err
;
1878 unsigned int data_size
= pi
->size
, ds
;
1879 void *dig_in
= peer_device
->connection
->int_dig_in
;
1880 void *dig_vv
= peer_device
->connection
->int_dig_vv
;
1881 unsigned long *data
;
1882 struct p_trim
*trim
= (pi
->cmd
== P_TRIM
) ? pi
->data
: NULL
;
1883 struct p_trim
*zeroes
= (pi
->cmd
== P_ZEROES
) ? pi
->data
: NULL
;
1884 struct p_trim
*wsame
= (pi
->cmd
== P_WSAME
) ? pi
->data
: NULL
;
1887 if (!trim
&& peer_device
->connection
->peer_integrity_tfm
) {
1888 digest_size
= crypto_shash_digestsize(peer_device
->connection
->peer_integrity_tfm
);
1890 * FIXME: Receive the incoming digest into the receive buffer
1891 * here, together with its struct p_data?
1893 err
= drbd_recv_all_warn(peer_device
->connection
, dig_in
, digest_size
);
1896 data_size
-= digest_size
;
1899 /* assume request_size == data_size, but special case trim and wsame. */
1902 if (!expect(data_size
== 0))
1904 ds
= be32_to_cpu(trim
->size
);
1905 } else if (zeroes
) {
1906 if (!expect(data_size
== 0))
1908 ds
= be32_to_cpu(zeroes
->size
);
1910 if (data_size
!= queue_logical_block_size(device
->rq_queue
)) {
1911 drbd_err(peer_device
, "data size (%u) != drbd logical block size (%u)\n",
1912 data_size
, queue_logical_block_size(device
->rq_queue
));
1915 if (data_size
!= bdev_logical_block_size(device
->ldev
->backing_bdev
)) {
1916 drbd_err(peer_device
, "data size (%u) != backend logical block size (%u)\n",
1917 data_size
, bdev_logical_block_size(device
->ldev
->backing_bdev
));
1920 ds
= be32_to_cpu(wsame
->size
);
1923 if (!expect(IS_ALIGNED(ds
, 512)))
1925 if (trim
|| wsame
|| zeroes
) {
1926 if (!expect(ds
<= (DRBD_MAX_BBIO_SECTORS
<< 9)))
1928 } else if (!expect(ds
<= DRBD_MAX_BIO_SIZE
))
1931 /* even though we trust out peer,
1932 * we sometimes have to double check. */
1933 if (sector
+ (ds
>>9) > capacity
) {
1934 drbd_err(device
, "request from peer beyond end of local disk: "
1935 "capacity: %llus < sector: %llus + size: %u\n",
1936 (unsigned long long)capacity
,
1937 (unsigned long long)sector
, ds
);
1941 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1942 * "criss-cross" setup, that might cause write-out on some other DRBD,
1943 * which in turn might block on the other node at this very place. */
1944 peer_req
= drbd_alloc_peer_req(peer_device
, id
, sector
, ds
, data_size
, GFP_NOIO
);
1948 peer_req
->flags
|= EE_WRITE
;
1950 peer_req
->flags
|= EE_TRIM
;
1954 peer_req
->flags
|= EE_ZEROOUT
;
1958 peer_req
->flags
|= EE_WRITE_SAME
;
1960 /* receive payload size bytes into page chain */
1962 page
= peer_req
->pages
;
1963 page_chain_for_each(page
) {
1964 unsigned len
= min_t(int, ds
, PAGE_SIZE
);
1966 err
= drbd_recv_all_warn(peer_device
->connection
, data
, len
);
1967 if (drbd_insert_fault(device
, DRBD_FAULT_RECEIVE
)) {
1968 drbd_err(device
, "Fault injection: Corrupting data on receive\n");
1969 data
[0] = data
[0] ^ (unsigned long)-1;
1973 drbd_free_peer_req(device
, peer_req
);
1980 drbd_csum_ee_size(peer_device
->connection
->peer_integrity_tfm
, peer_req
, dig_vv
, data_size
);
1981 if (memcmp(dig_in
, dig_vv
, digest_size
)) {
1982 drbd_err(device
, "Digest integrity check FAILED: %llus +%u\n",
1983 (unsigned long long)sector
, data_size
);
1984 drbd_free_peer_req(device
, peer_req
);
1988 device
->recv_cnt
+= data_size
>> 9;
1992 /* drbd_drain_block() just takes a data block
1993 * out of the socket input buffer, and discards it.
1995 static int drbd_drain_block(struct drbd_peer_device
*peer_device
, int data_size
)
2004 page
= drbd_alloc_pages(peer_device
, 1, 1);
2008 unsigned int len
= min_t(int, data_size
, PAGE_SIZE
);
2010 err
= drbd_recv_all_warn(peer_device
->connection
, data
, len
);
2016 drbd_free_pages(peer_device
->device
, page
, 0);
2020 static int recv_dless_read(struct drbd_peer_device
*peer_device
, struct drbd_request
*req
,
2021 sector_t sector
, int data_size
)
2023 struct bio_vec bvec
;
2024 struct bvec_iter iter
;
2026 int digest_size
, err
, expect
;
2027 void *dig_in
= peer_device
->connection
->int_dig_in
;
2028 void *dig_vv
= peer_device
->connection
->int_dig_vv
;
2031 if (peer_device
->connection
->peer_integrity_tfm
) {
2032 digest_size
= crypto_shash_digestsize(peer_device
->connection
->peer_integrity_tfm
);
2033 err
= drbd_recv_all_warn(peer_device
->connection
, dig_in
, digest_size
);
2036 data_size
-= digest_size
;
2039 /* optimistically update recv_cnt. if receiving fails below,
2040 * we disconnect anyways, and counters will be reset. */
2041 peer_device
->device
->recv_cnt
+= data_size
>>9;
2043 bio
= req
->master_bio
;
2044 D_ASSERT(peer_device
->device
, sector
== bio
->bi_iter
.bi_sector
);
2046 bio_for_each_segment(bvec
, bio
, iter
) {
2047 void *mapped
= kmap(bvec
.bv_page
) + bvec
.bv_offset
;
2048 expect
= min_t(int, data_size
, bvec
.bv_len
);
2049 err
= drbd_recv_all_warn(peer_device
->connection
, mapped
, expect
);
2050 kunmap(bvec
.bv_page
);
2053 data_size
-= expect
;
2057 drbd_csum_bio(peer_device
->connection
->peer_integrity_tfm
, bio
, dig_vv
);
2058 if (memcmp(dig_in
, dig_vv
, digest_size
)) {
2059 drbd_err(peer_device
, "Digest integrity check FAILED. Broken NICs?\n");
2064 D_ASSERT(peer_device
->device
, data_size
== 0);
2069 * e_end_resync_block() is called in ack_sender context via
2070 * drbd_finish_peer_reqs().
2072 static int e_end_resync_block(struct drbd_work
*w
, int unused
)
2074 struct drbd_peer_request
*peer_req
=
2075 container_of(w
, struct drbd_peer_request
, w
);
2076 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
2077 struct drbd_device
*device
= peer_device
->device
;
2078 sector_t sector
= peer_req
->i
.sector
;
2081 D_ASSERT(device
, drbd_interval_empty(&peer_req
->i
));
2083 if (likely((peer_req
->flags
& EE_WAS_ERROR
) == 0)) {
2084 drbd_set_in_sync(device
, sector
, peer_req
->i
.size
);
2085 err
= drbd_send_ack(peer_device
, P_RS_WRITE_ACK
, peer_req
);
2087 /* Record failure to sync */
2088 drbd_rs_failed_io(device
, sector
, peer_req
->i
.size
);
2090 err
= drbd_send_ack(peer_device
, P_NEG_ACK
, peer_req
);
2092 dec_unacked(device
);
2097 static int recv_resync_read(struct drbd_peer_device
*peer_device
, sector_t sector
,
2098 struct packet_info
*pi
) __releases(local
)
2100 struct drbd_device
*device
= peer_device
->device
;
2101 struct drbd_peer_request
*peer_req
;
2103 peer_req
= read_in_block(peer_device
, ID_SYNCER
, sector
, pi
);
2107 dec_rs_pending(device
);
2109 inc_unacked(device
);
2110 /* corresponding dec_unacked() in e_end_resync_block()
2111 * respective _drbd_clear_done_ee */
2113 peer_req
->w
.cb
= e_end_resync_block
;
2114 peer_req
->submit_jif
= jiffies
;
2116 spin_lock_irq(&device
->resource
->req_lock
);
2117 list_add_tail(&peer_req
->w
.list
, &device
->sync_ee
);
2118 spin_unlock_irq(&device
->resource
->req_lock
);
2120 atomic_add(pi
->size
>> 9, &device
->rs_sect_ev
);
2121 if (drbd_submit_peer_request(device
, peer_req
, REQ_OP_WRITE
, 0,
2122 DRBD_FAULT_RS_WR
) == 0)
2125 /* don't care for the reason here */
2126 drbd_err(device
, "submit failed, triggering re-connect\n");
2127 spin_lock_irq(&device
->resource
->req_lock
);
2128 list_del(&peer_req
->w
.list
);
2129 spin_unlock_irq(&device
->resource
->req_lock
);
2131 drbd_free_peer_req(device
, peer_req
);
2137 static struct drbd_request
*
2138 find_request(struct drbd_device
*device
, struct rb_root
*root
, u64 id
,
2139 sector_t sector
, bool missing_ok
, const char *func
)
2141 struct drbd_request
*req
;
2143 /* Request object according to our peer */
2144 req
= (struct drbd_request
*)(unsigned long)id
;
2145 if (drbd_contains_interval(root
, sector
, &req
->i
) && req
->i
.local
)
2148 drbd_err(device
, "%s: failed to find request 0x%lx, sector %llus\n", func
,
2149 (unsigned long)id
, (unsigned long long)sector
);
2154 static int receive_DataReply(struct drbd_connection
*connection
, struct packet_info
*pi
)
2156 struct drbd_peer_device
*peer_device
;
2157 struct drbd_device
*device
;
2158 struct drbd_request
*req
;
2161 struct p_data
*p
= pi
->data
;
2163 peer_device
= conn_peer_device(connection
, pi
->vnr
);
2166 device
= peer_device
->device
;
2168 sector
= be64_to_cpu(p
->sector
);
2170 spin_lock_irq(&device
->resource
->req_lock
);
2171 req
= find_request(device
, &device
->read_requests
, p
->block_id
, sector
, false, __func__
);
2172 spin_unlock_irq(&device
->resource
->req_lock
);
2176 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2177 * special casing it there for the various failure cases.
2178 * still no race with drbd_fail_pending_reads */
2179 err
= recv_dless_read(peer_device
, req
, sector
, pi
->size
);
2181 req_mod(req
, DATA_RECEIVED
);
2182 /* else: nothing. handled from drbd_disconnect...
2183 * I don't think we may complete this just yet
2184 * in case we are "on-disconnect: freeze" */
2189 static int receive_RSDataReply(struct drbd_connection
*connection
, struct packet_info
*pi
)
2191 struct drbd_peer_device
*peer_device
;
2192 struct drbd_device
*device
;
2195 struct p_data
*p
= pi
->data
;
2197 peer_device
= conn_peer_device(connection
, pi
->vnr
);
2200 device
= peer_device
->device
;
2202 sector
= be64_to_cpu(p
->sector
);
2203 D_ASSERT(device
, p
->block_id
== ID_SYNCER
);
2205 if (get_ldev(device
)) {
2206 /* data is submitted to disk within recv_resync_read.
2207 * corresponding put_ldev done below on error,
2208 * or in drbd_peer_request_endio. */
2209 err
= recv_resync_read(peer_device
, sector
, pi
);
2211 if (__ratelimit(&drbd_ratelimit_state
))
2212 drbd_err(device
, "Can not write resync data to local disk.\n");
2214 err
= drbd_drain_block(peer_device
, pi
->size
);
2216 drbd_send_ack_dp(peer_device
, P_NEG_ACK
, p
, pi
->size
);
2219 atomic_add(pi
->size
>> 9, &device
->rs_sect_in
);
2224 static void restart_conflicting_writes(struct drbd_device
*device
,
2225 sector_t sector
, int size
)
2227 struct drbd_interval
*i
;
2228 struct drbd_request
*req
;
2230 drbd_for_each_overlap(i
, &device
->write_requests
, sector
, size
) {
2233 req
= container_of(i
, struct drbd_request
, i
);
2234 if (req
->rq_state
& RQ_LOCAL_PENDING
||
2235 !(req
->rq_state
& RQ_POSTPONED
))
2237 /* as it is RQ_POSTPONED, this will cause it to
2238 * be queued on the retry workqueue. */
2239 __req_mod(req
, CONFLICT_RESOLVED
, NULL
);
2244 * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2246 static int e_end_block(struct drbd_work
*w
, int cancel
)
2248 struct drbd_peer_request
*peer_req
=
2249 container_of(w
, struct drbd_peer_request
, w
);
2250 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
2251 struct drbd_device
*device
= peer_device
->device
;
2252 sector_t sector
= peer_req
->i
.sector
;
2255 if (peer_req
->flags
& EE_SEND_WRITE_ACK
) {
2256 if (likely((peer_req
->flags
& EE_WAS_ERROR
) == 0)) {
2257 pcmd
= (device
->state
.conn
>= C_SYNC_SOURCE
&&
2258 device
->state
.conn
<= C_PAUSED_SYNC_T
&&
2259 peer_req
->flags
& EE_MAY_SET_IN_SYNC
) ?
2260 P_RS_WRITE_ACK
: P_WRITE_ACK
;
2261 err
= drbd_send_ack(peer_device
, pcmd
, peer_req
);
2262 if (pcmd
== P_RS_WRITE_ACK
)
2263 drbd_set_in_sync(device
, sector
, peer_req
->i
.size
);
2265 err
= drbd_send_ack(peer_device
, P_NEG_ACK
, peer_req
);
2266 /* we expect it to be marked out of sync anyways...
2267 * maybe assert this? */
2269 dec_unacked(device
);
2272 /* we delete from the conflict detection hash _after_ we sent out the
2273 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
2274 if (peer_req
->flags
& EE_IN_INTERVAL_TREE
) {
2275 spin_lock_irq(&device
->resource
->req_lock
);
2276 D_ASSERT(device
, !drbd_interval_empty(&peer_req
->i
));
2277 drbd_remove_epoch_entry_interval(device
, peer_req
);
2278 if (peer_req
->flags
& EE_RESTART_REQUESTS
)
2279 restart_conflicting_writes(device
, sector
, peer_req
->i
.size
);
2280 spin_unlock_irq(&device
->resource
->req_lock
);
2282 D_ASSERT(device
, drbd_interval_empty(&peer_req
->i
));
2284 drbd_may_finish_epoch(peer_device
->connection
, peer_req
->epoch
, EV_PUT
+ (cancel
? EV_CLEANUP
: 0));
2289 static int e_send_ack(struct drbd_work
*w
, enum drbd_packet ack
)
2291 struct drbd_peer_request
*peer_req
=
2292 container_of(w
, struct drbd_peer_request
, w
);
2293 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
2296 err
= drbd_send_ack(peer_device
, ack
, peer_req
);
2297 dec_unacked(peer_device
->device
);
2302 static int e_send_superseded(struct drbd_work
*w
, int unused
)
2304 return e_send_ack(w
, P_SUPERSEDED
);
2307 static int e_send_retry_write(struct drbd_work
*w
, int unused
)
2309 struct drbd_peer_request
*peer_req
=
2310 container_of(w
, struct drbd_peer_request
, w
);
2311 struct drbd_connection
*connection
= peer_req
->peer_device
->connection
;
2313 return e_send_ack(w
, connection
->agreed_pro_version
>= 100 ?
2314 P_RETRY_WRITE
: P_SUPERSEDED
);
2317 static bool seq_greater(u32 a
, u32 b
)
2320 * We assume 32-bit wrap-around here.
2321 * For 24-bit wrap-around, we would have to shift:
2324 return (s32
)a
- (s32
)b
> 0;
2327 static u32
seq_max(u32 a
, u32 b
)
2329 return seq_greater(a
, b
) ? a
: b
;
2332 static void update_peer_seq(struct drbd_peer_device
*peer_device
, unsigned int peer_seq
)
2334 struct drbd_device
*device
= peer_device
->device
;
2335 unsigned int newest_peer_seq
;
2337 if (test_bit(RESOLVE_CONFLICTS
, &peer_device
->connection
->flags
)) {
2338 spin_lock(&device
->peer_seq_lock
);
2339 newest_peer_seq
= seq_max(device
->peer_seq
, peer_seq
);
2340 device
->peer_seq
= newest_peer_seq
;
2341 spin_unlock(&device
->peer_seq_lock
);
2342 /* wake up only if we actually changed device->peer_seq */
2343 if (peer_seq
== newest_peer_seq
)
2344 wake_up(&device
->seq_wait
);
2348 static inline int overlaps(sector_t s1
, int l1
, sector_t s2
, int l2
)
2350 return !((s1
+ (l1
>>9) <= s2
) || (s1
>= s2
+ (l2
>>9)));
2353 /* maybe change sync_ee into interval trees as well? */
2354 static bool overlapping_resync_write(struct drbd_device
*device
, struct drbd_peer_request
*peer_req
)
2356 struct drbd_peer_request
*rs_req
;
2359 spin_lock_irq(&device
->resource
->req_lock
);
2360 list_for_each_entry(rs_req
, &device
->sync_ee
, w
.list
) {
2361 if (overlaps(peer_req
->i
.sector
, peer_req
->i
.size
,
2362 rs_req
->i
.sector
, rs_req
->i
.size
)) {
2367 spin_unlock_irq(&device
->resource
->req_lock
);
2372 /* Called from receive_Data.
2373 * Synchronize packets on sock with packets on msock.
2375 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2376 * packet traveling on msock, they are still processed in the order they have
2379 * Note: we don't care for Ack packets overtaking P_DATA packets.
2381 * In case packet_seq is larger than device->peer_seq number, there are
2382 * outstanding packets on the msock. We wait for them to arrive.
2383 * In case we are the logically next packet, we update device->peer_seq
2384 * ourselves. Correctly handles 32bit wrap around.
2386 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2387 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2388 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2389 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2391 * returns 0 if we may process the packet,
2392 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2393 static int wait_for_and_update_peer_seq(struct drbd_peer_device
*peer_device
, const u32 peer_seq
)
2395 struct drbd_device
*device
= peer_device
->device
;
2400 if (!test_bit(RESOLVE_CONFLICTS
, &peer_device
->connection
->flags
))
2403 spin_lock(&device
->peer_seq_lock
);
2405 if (!seq_greater(peer_seq
- 1, device
->peer_seq
)) {
2406 device
->peer_seq
= seq_max(device
->peer_seq
, peer_seq
);
2410 if (signal_pending(current
)) {
2416 tp
= rcu_dereference(peer_device
->connection
->net_conf
)->two_primaries
;
2422 /* Only need to wait if two_primaries is enabled */
2423 prepare_to_wait(&device
->seq_wait
, &wait
, TASK_INTERRUPTIBLE
);
2424 spin_unlock(&device
->peer_seq_lock
);
2426 timeout
= rcu_dereference(peer_device
->connection
->net_conf
)->ping_timeo
*HZ
/10;
2428 timeout
= schedule_timeout(timeout
);
2429 spin_lock(&device
->peer_seq_lock
);
2432 drbd_err(device
, "Timed out waiting for missing ack packets; disconnecting\n");
2436 spin_unlock(&device
->peer_seq_lock
);
2437 finish_wait(&device
->seq_wait
, &wait
);
2441 /* see also bio_flags_to_wire()
2442 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2443 * flags and back. We may replicate to other kernel versions. */
2444 static unsigned long wire_flags_to_bio_flags(u32 dpf
)
2446 return (dpf
& DP_RW_SYNC
? REQ_SYNC
: 0) |
2447 (dpf
& DP_FUA
? REQ_FUA
: 0) |
2448 (dpf
& DP_FLUSH
? REQ_PREFLUSH
: 0);
2451 static unsigned long wire_flags_to_bio_op(u32 dpf
)
2453 if (dpf
& DP_ZEROES
)
2454 return REQ_OP_WRITE_ZEROES
;
2455 if (dpf
& DP_DISCARD
)
2456 return REQ_OP_DISCARD
;
2458 return REQ_OP_WRITE_SAME
;
2460 return REQ_OP_WRITE
;
2463 static void fail_postponed_requests(struct drbd_device
*device
, sector_t sector
,
2466 struct drbd_interval
*i
;
2469 drbd_for_each_overlap(i
, &device
->write_requests
, sector
, size
) {
2470 struct drbd_request
*req
;
2471 struct bio_and_error m
;
2475 req
= container_of(i
, struct drbd_request
, i
);
2476 if (!(req
->rq_state
& RQ_POSTPONED
))
2478 req
->rq_state
&= ~RQ_POSTPONED
;
2479 __req_mod(req
, NEG_ACKED
, &m
);
2480 spin_unlock_irq(&device
->resource
->req_lock
);
2482 complete_master_bio(device
, &m
);
2483 spin_lock_irq(&device
->resource
->req_lock
);
2488 static int handle_write_conflicts(struct drbd_device
*device
,
2489 struct drbd_peer_request
*peer_req
)
2491 struct drbd_connection
*connection
= peer_req
->peer_device
->connection
;
2492 bool resolve_conflicts
= test_bit(RESOLVE_CONFLICTS
, &connection
->flags
);
2493 sector_t sector
= peer_req
->i
.sector
;
2494 const unsigned int size
= peer_req
->i
.size
;
2495 struct drbd_interval
*i
;
2500 * Inserting the peer request into the write_requests tree will prevent
2501 * new conflicting local requests from being added.
2503 drbd_insert_interval(&device
->write_requests
, &peer_req
->i
);
2506 drbd_for_each_overlap(i
, &device
->write_requests
, sector
, size
) {
2507 if (i
== &peer_req
->i
)
2514 * Our peer has sent a conflicting remote request; this
2515 * should not happen in a two-node setup. Wait for the
2516 * earlier peer request to complete.
2518 err
= drbd_wait_misc(device
, i
);
2524 equal
= i
->sector
== sector
&& i
->size
== size
;
2525 if (resolve_conflicts
) {
2527 * If the peer request is fully contained within the
2528 * overlapping request, it can be considered overwritten
2529 * and thus superseded; otherwise, it will be retried
2530 * once all overlapping requests have completed.
2532 bool superseded
= i
->sector
<= sector
&& i
->sector
+
2533 (i
->size
>> 9) >= sector
+ (size
>> 9);
2536 drbd_alert(device
, "Concurrent writes detected: "
2537 "local=%llus +%u, remote=%llus +%u, "
2538 "assuming %s came first\n",
2539 (unsigned long long)i
->sector
, i
->size
,
2540 (unsigned long long)sector
, size
,
2541 superseded
? "local" : "remote");
2543 peer_req
->w
.cb
= superseded
? e_send_superseded
:
2545 list_add_tail(&peer_req
->w
.list
, &device
->done_ee
);
2546 queue_work(connection
->ack_sender
, &peer_req
->peer_device
->send_acks_work
);
2551 struct drbd_request
*req
=
2552 container_of(i
, struct drbd_request
, i
);
2555 drbd_alert(device
, "Concurrent writes detected: "
2556 "local=%llus +%u, remote=%llus +%u\n",
2557 (unsigned long long)i
->sector
, i
->size
,
2558 (unsigned long long)sector
, size
);
2560 if (req
->rq_state
& RQ_LOCAL_PENDING
||
2561 !(req
->rq_state
& RQ_POSTPONED
)) {
2563 * Wait for the node with the discard flag to
2564 * decide if this request has been superseded
2565 * or needs to be retried.
2566 * Requests that have been superseded will
2567 * disappear from the write_requests tree.
2569 * In addition, wait for the conflicting
2570 * request to finish locally before submitting
2571 * the conflicting peer request.
2573 err
= drbd_wait_misc(device
, &req
->i
);
2575 _conn_request_state(connection
, NS(conn
, C_TIMEOUT
), CS_HARD
);
2576 fail_postponed_requests(device
, sector
, size
);
2582 * Remember to restart the conflicting requests after
2583 * the new peer request has completed.
2585 peer_req
->flags
|= EE_RESTART_REQUESTS
;
2592 drbd_remove_epoch_entry_interval(device
, peer_req
);
2596 /* mirrored write */
2597 static int receive_Data(struct drbd_connection
*connection
, struct packet_info
*pi
)
2599 struct drbd_peer_device
*peer_device
;
2600 struct drbd_device
*device
;
2601 struct net_conf
*nc
;
2603 struct drbd_peer_request
*peer_req
;
2604 struct p_data
*p
= pi
->data
;
2605 u32 peer_seq
= be32_to_cpu(p
->seq_num
);
2610 peer_device
= conn_peer_device(connection
, pi
->vnr
);
2613 device
= peer_device
->device
;
2615 if (!get_ldev(device
)) {
2618 err
= wait_for_and_update_peer_seq(peer_device
, peer_seq
);
2619 drbd_send_ack_dp(peer_device
, P_NEG_ACK
, p
, pi
->size
);
2620 atomic_inc(&connection
->current_epoch
->epoch_size
);
2621 err2
= drbd_drain_block(peer_device
, pi
->size
);
2628 * Corresponding put_ldev done either below (on various errors), or in
2629 * drbd_peer_request_endio, if we successfully submit the data at the
2630 * end of this function.
2633 sector
= be64_to_cpu(p
->sector
);
2634 peer_req
= read_in_block(peer_device
, p
->block_id
, sector
, pi
);
2640 peer_req
->w
.cb
= e_end_block
;
2641 peer_req
->submit_jif
= jiffies
;
2642 peer_req
->flags
|= EE_APPLICATION
;
2644 dp_flags
= be32_to_cpu(p
->dp_flags
);
2645 op
= wire_flags_to_bio_op(dp_flags
);
2646 op_flags
= wire_flags_to_bio_flags(dp_flags
);
2647 if (pi
->cmd
== P_TRIM
) {
2648 D_ASSERT(peer_device
, peer_req
->i
.size
> 0);
2649 D_ASSERT(peer_device
, op
== REQ_OP_DISCARD
);
2650 D_ASSERT(peer_device
, peer_req
->pages
== NULL
);
2651 /* need to play safe: an older DRBD sender
2652 * may mean zero-out while sending P_TRIM. */
2653 if (0 == (connection
->agreed_features
& DRBD_FF_WZEROES
))
2654 peer_req
->flags
|= EE_ZEROOUT
;
2655 } else if (pi
->cmd
== P_ZEROES
) {
2656 D_ASSERT(peer_device
, peer_req
->i
.size
> 0);
2657 D_ASSERT(peer_device
, op
== REQ_OP_WRITE_ZEROES
);
2658 D_ASSERT(peer_device
, peer_req
->pages
== NULL
);
2659 /* Do (not) pass down BLKDEV_ZERO_NOUNMAP? */
2660 if (dp_flags
& DP_DISCARD
)
2661 peer_req
->flags
|= EE_TRIM
;
2662 } else if (peer_req
->pages
== NULL
) {
2663 D_ASSERT(device
, peer_req
->i
.size
== 0);
2664 D_ASSERT(device
, dp_flags
& DP_FLUSH
);
2667 if (dp_flags
& DP_MAY_SET_IN_SYNC
)
2668 peer_req
->flags
|= EE_MAY_SET_IN_SYNC
;
2670 spin_lock(&connection
->epoch_lock
);
2671 peer_req
->epoch
= connection
->current_epoch
;
2672 atomic_inc(&peer_req
->epoch
->epoch_size
);
2673 atomic_inc(&peer_req
->epoch
->active
);
2674 spin_unlock(&connection
->epoch_lock
);
2677 nc
= rcu_dereference(peer_device
->connection
->net_conf
);
2678 tp
= nc
->two_primaries
;
2679 if (peer_device
->connection
->agreed_pro_version
< 100) {
2680 switch (nc
->wire_protocol
) {
2682 dp_flags
|= DP_SEND_WRITE_ACK
;
2685 dp_flags
|= DP_SEND_RECEIVE_ACK
;
2691 if (dp_flags
& DP_SEND_WRITE_ACK
) {
2692 peer_req
->flags
|= EE_SEND_WRITE_ACK
;
2693 inc_unacked(device
);
2694 /* corresponding dec_unacked() in e_end_block()
2695 * respective _drbd_clear_done_ee */
2698 if (dp_flags
& DP_SEND_RECEIVE_ACK
) {
2699 /* I really don't like it that the receiver thread
2700 * sends on the msock, but anyways */
2701 drbd_send_ack(peer_device
, P_RECV_ACK
, peer_req
);
2705 /* two primaries implies protocol C */
2706 D_ASSERT(device
, dp_flags
& DP_SEND_WRITE_ACK
);
2707 peer_req
->flags
|= EE_IN_INTERVAL_TREE
;
2708 err
= wait_for_and_update_peer_seq(peer_device
, peer_seq
);
2710 goto out_interrupted
;
2711 spin_lock_irq(&device
->resource
->req_lock
);
2712 err
= handle_write_conflicts(device
, peer_req
);
2714 spin_unlock_irq(&device
->resource
->req_lock
);
2715 if (err
== -ENOENT
) {
2719 goto out_interrupted
;
2722 update_peer_seq(peer_device
, peer_seq
);
2723 spin_lock_irq(&device
->resource
->req_lock
);
2725 /* TRIM and WRITE_SAME are processed synchronously,
2726 * we wait for all pending requests, respectively wait for
2727 * active_ee to become empty in drbd_submit_peer_request();
2728 * better not add ourselves here. */
2729 if ((peer_req
->flags
& (EE_TRIM
|EE_WRITE_SAME
|EE_ZEROOUT
)) == 0)
2730 list_add_tail(&peer_req
->w
.list
, &device
->active_ee
);
2731 spin_unlock_irq(&device
->resource
->req_lock
);
2733 if (device
->state
.conn
== C_SYNC_TARGET
)
2734 wait_event(device
->ee_wait
, !overlapping_resync_write(device
, peer_req
));
2736 if (device
->state
.pdsk
< D_INCONSISTENT
) {
2737 /* In case we have the only disk of the cluster, */
2738 drbd_set_out_of_sync(device
, peer_req
->i
.sector
, peer_req
->i
.size
);
2739 peer_req
->flags
&= ~EE_MAY_SET_IN_SYNC
;
2740 drbd_al_begin_io(device
, &peer_req
->i
);
2741 peer_req
->flags
|= EE_CALL_AL_COMPLETE_IO
;
2744 err
= drbd_submit_peer_request(device
, peer_req
, op
, op_flags
,
2749 /* don't care for the reason here */
2750 drbd_err(device
, "submit failed, triggering re-connect\n");
2751 spin_lock_irq(&device
->resource
->req_lock
);
2752 list_del(&peer_req
->w
.list
);
2753 drbd_remove_epoch_entry_interval(device
, peer_req
);
2754 spin_unlock_irq(&device
->resource
->req_lock
);
2755 if (peer_req
->flags
& EE_CALL_AL_COMPLETE_IO
) {
2756 peer_req
->flags
&= ~EE_CALL_AL_COMPLETE_IO
;
2757 drbd_al_complete_io(device
, &peer_req
->i
);
2761 drbd_may_finish_epoch(connection
, peer_req
->epoch
, EV_PUT
| EV_CLEANUP
);
2763 drbd_free_peer_req(device
, peer_req
);
2767 /* We may throttle resync, if the lower device seems to be busy,
2768 * and current sync rate is above c_min_rate.
2770 * To decide whether or not the lower device is busy, we use a scheme similar
2771 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2772 * (more than 64 sectors) of activity we cannot account for with our own resync
2773 * activity, it obviously is "busy".
2775 * The current sync rate used here uses only the most recent two step marks,
2776 * to have a short time average so we can react faster.
2778 bool drbd_rs_should_slow_down(struct drbd_device
*device
, sector_t sector
,
2779 bool throttle_if_app_is_waiting
)
2781 struct lc_element
*tmp
;
2782 bool throttle
= drbd_rs_c_min_rate_throttle(device
);
2784 if (!throttle
|| throttle_if_app_is_waiting
)
2787 spin_lock_irq(&device
->al_lock
);
2788 tmp
= lc_find(device
->resync
, BM_SECT_TO_EXT(sector
));
2790 struct bm_extent
*bm_ext
= lc_entry(tmp
, struct bm_extent
, lce
);
2791 if (test_bit(BME_PRIORITY
, &bm_ext
->flags
))
2793 /* Do not slow down if app IO is already waiting for this extent,
2794 * and our progress is necessary for application IO to complete. */
2796 spin_unlock_irq(&device
->al_lock
);
2801 bool drbd_rs_c_min_rate_throttle(struct drbd_device
*device
)
2803 struct gendisk
*disk
= device
->ldev
->backing_bdev
->bd_contains
->bd_disk
;
2804 unsigned long db
, dt
, dbdt
;
2805 unsigned int c_min_rate
;
2809 c_min_rate
= rcu_dereference(device
->ldev
->disk_conf
)->c_min_rate
;
2812 /* feature disabled? */
2813 if (c_min_rate
== 0)
2816 curr_events
= (int)part_stat_read_accum(&disk
->part0
, sectors
) -
2817 atomic_read(&device
->rs_sect_ev
);
2819 if (atomic_read(&device
->ap_actlog_cnt
)
2820 || curr_events
- device
->rs_last_events
> 64) {
2821 unsigned long rs_left
;
2824 device
->rs_last_events
= curr_events
;
2826 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2828 i
= (device
->rs_last_mark
+ DRBD_SYNC_MARKS
-1) % DRBD_SYNC_MARKS
;
2830 if (device
->state
.conn
== C_VERIFY_S
|| device
->state
.conn
== C_VERIFY_T
)
2831 rs_left
= device
->ov_left
;
2833 rs_left
= drbd_bm_total_weight(device
) - device
->rs_failed
;
2835 dt
= ((long)jiffies
- (long)device
->rs_mark_time
[i
]) / HZ
;
2838 db
= device
->rs_mark_left
[i
] - rs_left
;
2839 dbdt
= Bit2KB(db
/dt
);
2841 if (dbdt
> c_min_rate
)
2847 static int receive_DataRequest(struct drbd_connection
*connection
, struct packet_info
*pi
)
2849 struct drbd_peer_device
*peer_device
;
2850 struct drbd_device
*device
;
2853 struct drbd_peer_request
*peer_req
;
2854 struct digest_info
*di
= NULL
;
2856 unsigned int fault_type
;
2857 struct p_block_req
*p
= pi
->data
;
2859 peer_device
= conn_peer_device(connection
, pi
->vnr
);
2862 device
= peer_device
->device
;
2863 capacity
= drbd_get_capacity(device
->this_bdev
);
2865 sector
= be64_to_cpu(p
->sector
);
2866 size
= be32_to_cpu(p
->blksize
);
2868 if (size
<= 0 || !IS_ALIGNED(size
, 512) || size
> DRBD_MAX_BIO_SIZE
) {
2869 drbd_err(device
, "%s:%d: sector: %llus, size: %u\n", __FILE__
, __LINE__
,
2870 (unsigned long long)sector
, size
);
2873 if (sector
+ (size
>>9) > capacity
) {
2874 drbd_err(device
, "%s:%d: sector: %llus, size: %u\n", __FILE__
, __LINE__
,
2875 (unsigned long long)sector
, size
);
2879 if (!get_ldev_if_state(device
, D_UP_TO_DATE
)) {
2882 case P_DATA_REQUEST
:
2883 drbd_send_ack_rp(peer_device
, P_NEG_DREPLY
, p
);
2886 case P_RS_DATA_REQUEST
:
2887 case P_CSUM_RS_REQUEST
:
2889 drbd_send_ack_rp(peer_device
, P_NEG_RS_DREPLY
, p
);
2893 dec_rs_pending(device
);
2894 drbd_send_ack_ex(peer_device
, P_OV_RESULT
, sector
, size
, ID_IN_SYNC
);
2899 if (verb
&& __ratelimit(&drbd_ratelimit_state
))
2900 drbd_err(device
, "Can not satisfy peer's read request, "
2901 "no local data.\n");
2903 /* drain possibly payload */
2904 return drbd_drain_block(peer_device
, pi
->size
);
2907 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2908 * "criss-cross" setup, that might cause write-out on some other DRBD,
2909 * which in turn might block on the other node at this very place. */
2910 peer_req
= drbd_alloc_peer_req(peer_device
, p
->block_id
, sector
, size
,
2918 case P_DATA_REQUEST
:
2919 peer_req
->w
.cb
= w_e_end_data_req
;
2920 fault_type
= DRBD_FAULT_DT_RD
;
2921 /* application IO, don't drbd_rs_begin_io */
2922 peer_req
->flags
|= EE_APPLICATION
;
2926 /* If at some point in the future we have a smart way to
2927 find out if this data block is completely deallocated,
2928 then we would do something smarter here than reading
2930 peer_req
->flags
|= EE_RS_THIN_REQ
;
2932 case P_RS_DATA_REQUEST
:
2933 peer_req
->w
.cb
= w_e_end_rsdata_req
;
2934 fault_type
= DRBD_FAULT_RS_RD
;
2935 /* used in the sector offset progress display */
2936 device
->bm_resync_fo
= BM_SECT_TO_BIT(sector
);
2940 case P_CSUM_RS_REQUEST
:
2941 fault_type
= DRBD_FAULT_RS_RD
;
2942 di
= kmalloc(sizeof(*di
) + pi
->size
, GFP_NOIO
);
2946 di
->digest_size
= pi
->size
;
2947 di
->digest
= (((char *)di
)+sizeof(struct digest_info
));
2949 peer_req
->digest
= di
;
2950 peer_req
->flags
|= EE_HAS_DIGEST
;
2952 if (drbd_recv_all(peer_device
->connection
, di
->digest
, pi
->size
))
2955 if (pi
->cmd
== P_CSUM_RS_REQUEST
) {
2956 D_ASSERT(device
, peer_device
->connection
->agreed_pro_version
>= 89);
2957 peer_req
->w
.cb
= w_e_end_csum_rs_req
;
2958 /* used in the sector offset progress display */
2959 device
->bm_resync_fo
= BM_SECT_TO_BIT(sector
);
2960 /* remember to report stats in drbd_resync_finished */
2961 device
->use_csums
= true;
2962 } else if (pi
->cmd
== P_OV_REPLY
) {
2963 /* track progress, we may need to throttle */
2964 atomic_add(size
>> 9, &device
->rs_sect_in
);
2965 peer_req
->w
.cb
= w_e_end_ov_reply
;
2966 dec_rs_pending(device
);
2967 /* drbd_rs_begin_io done when we sent this request,
2968 * but accounting still needs to be done. */
2969 goto submit_for_resync
;
2974 if (device
->ov_start_sector
== ~(sector_t
)0 &&
2975 peer_device
->connection
->agreed_pro_version
>= 90) {
2976 unsigned long now
= jiffies
;
2978 device
->ov_start_sector
= sector
;
2979 device
->ov_position
= sector
;
2980 device
->ov_left
= drbd_bm_bits(device
) - BM_SECT_TO_BIT(sector
);
2981 device
->rs_total
= device
->ov_left
;
2982 for (i
= 0; i
< DRBD_SYNC_MARKS
; i
++) {
2983 device
->rs_mark_left
[i
] = device
->ov_left
;
2984 device
->rs_mark_time
[i
] = now
;
2986 drbd_info(device
, "Online Verify start sector: %llu\n",
2987 (unsigned long long)sector
);
2989 peer_req
->w
.cb
= w_e_end_ov_req
;
2990 fault_type
= DRBD_FAULT_RS_RD
;
2997 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2998 * wrt the receiver, but it is not as straightforward as it may seem.
2999 * Various places in the resync start and stop logic assume resync
3000 * requests are processed in order, requeuing this on the worker thread
3001 * introduces a bunch of new code for synchronization between threads.
3003 * Unlimited throttling before drbd_rs_begin_io may stall the resync
3004 * "forever", throttling after drbd_rs_begin_io will lock that extent
3005 * for application writes for the same time. For now, just throttle
3006 * here, where the rest of the code expects the receiver to sleep for
3010 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
3011 * this defers syncer requests for some time, before letting at least
3012 * on request through. The resync controller on the receiving side
3013 * will adapt to the incoming rate accordingly.
3015 * We cannot throttle here if remote is Primary/SyncTarget:
3016 * we would also throttle its application reads.
3017 * In that case, throttling is done on the SyncTarget only.
3020 /* Even though this may be a resync request, we do add to "read_ee";
3021 * "sync_ee" is only used for resync WRITEs.
3022 * Add to list early, so debugfs can find this request
3023 * even if we have to sleep below. */
3024 spin_lock_irq(&device
->resource
->req_lock
);
3025 list_add_tail(&peer_req
->w
.list
, &device
->read_ee
);
3026 spin_unlock_irq(&device
->resource
->req_lock
);
3028 update_receiver_timing_details(connection
, drbd_rs_should_slow_down
);
3029 if (device
->state
.peer
!= R_PRIMARY
3030 && drbd_rs_should_slow_down(device
, sector
, false))
3031 schedule_timeout_uninterruptible(HZ
/10);
3032 update_receiver_timing_details(connection
, drbd_rs_begin_io
);
3033 if (drbd_rs_begin_io(device
, sector
))
3037 atomic_add(size
>> 9, &device
->rs_sect_ev
);
3040 update_receiver_timing_details(connection
, drbd_submit_peer_request
);
3041 inc_unacked(device
);
3042 if (drbd_submit_peer_request(device
, peer_req
, REQ_OP_READ
, 0,
3046 /* don't care for the reason here */
3047 drbd_err(device
, "submit failed, triggering re-connect\n");
3050 spin_lock_irq(&device
->resource
->req_lock
);
3051 list_del(&peer_req
->w
.list
);
3052 spin_unlock_irq(&device
->resource
->req_lock
);
3053 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
3056 drbd_free_peer_req(device
, peer_req
);
3061 * drbd_asb_recover_0p - Recover after split-brain with no remaining primaries
3063 static int drbd_asb_recover_0p(struct drbd_peer_device
*peer_device
) __must_hold(local
)
3065 struct drbd_device
*device
= peer_device
->device
;
3066 int self
, peer
, rv
= -100;
3067 unsigned long ch_self
, ch_peer
;
3068 enum drbd_after_sb_p after_sb_0p
;
3070 self
= device
->ldev
->md
.uuid
[UI_BITMAP
] & 1;
3071 peer
= device
->p_uuid
[UI_BITMAP
] & 1;
3073 ch_peer
= device
->p_uuid
[UI_SIZE
];
3074 ch_self
= device
->comm_bm_set
;
3077 after_sb_0p
= rcu_dereference(peer_device
->connection
->net_conf
)->after_sb_0p
;
3079 switch (after_sb_0p
) {
3081 case ASB_DISCARD_SECONDARY
:
3082 case ASB_CALL_HELPER
:
3084 drbd_err(device
, "Configuration error.\n");
3086 case ASB_DISCONNECT
:
3088 case ASB_DISCARD_YOUNGER_PRI
:
3089 if (self
== 0 && peer
== 1) {
3093 if (self
== 1 && peer
== 0) {
3097 /* Else fall through to one of the other strategies... */
3098 case ASB_DISCARD_OLDER_PRI
:
3099 if (self
== 0 && peer
== 1) {
3103 if (self
== 1 && peer
== 0) {
3107 /* Else fall through to one of the other strategies... */
3108 drbd_warn(device
, "Discard younger/older primary did not find a decision\n"
3109 "Using discard-least-changes instead\n");
3111 case ASB_DISCARD_ZERO_CHG
:
3112 if (ch_peer
== 0 && ch_self
== 0) {
3113 rv
= test_bit(RESOLVE_CONFLICTS
, &peer_device
->connection
->flags
)
3117 if (ch_peer
== 0) { rv
= 1; break; }
3118 if (ch_self
== 0) { rv
= -1; break; }
3120 if (after_sb_0p
== ASB_DISCARD_ZERO_CHG
)
3122 /* else: fall through */
3123 case ASB_DISCARD_LEAST_CHG
:
3124 if (ch_self
< ch_peer
)
3126 else if (ch_self
> ch_peer
)
3128 else /* ( ch_self == ch_peer ) */
3129 /* Well, then use something else. */
3130 rv
= test_bit(RESOLVE_CONFLICTS
, &peer_device
->connection
->flags
)
3133 case ASB_DISCARD_LOCAL
:
3136 case ASB_DISCARD_REMOTE
:
3144 * drbd_asb_recover_1p - Recover after split-brain with one remaining primary
3146 static int drbd_asb_recover_1p(struct drbd_peer_device
*peer_device
) __must_hold(local
)
3148 struct drbd_device
*device
= peer_device
->device
;
3150 enum drbd_after_sb_p after_sb_1p
;
3153 after_sb_1p
= rcu_dereference(peer_device
->connection
->net_conf
)->after_sb_1p
;
3155 switch (after_sb_1p
) {
3156 case ASB_DISCARD_YOUNGER_PRI
:
3157 case ASB_DISCARD_OLDER_PRI
:
3158 case ASB_DISCARD_LEAST_CHG
:
3159 case ASB_DISCARD_LOCAL
:
3160 case ASB_DISCARD_REMOTE
:
3161 case ASB_DISCARD_ZERO_CHG
:
3162 drbd_err(device
, "Configuration error.\n");
3164 case ASB_DISCONNECT
:
3167 hg
= drbd_asb_recover_0p(peer_device
);
3168 if (hg
== -1 && device
->state
.role
== R_SECONDARY
)
3170 if (hg
== 1 && device
->state
.role
== R_PRIMARY
)
3174 rv
= drbd_asb_recover_0p(peer_device
);
3176 case ASB_DISCARD_SECONDARY
:
3177 return device
->state
.role
== R_PRIMARY
? 1 : -1;
3178 case ASB_CALL_HELPER
:
3179 hg
= drbd_asb_recover_0p(peer_device
);
3180 if (hg
== -1 && device
->state
.role
== R_PRIMARY
) {
3181 enum drbd_state_rv rv2
;
3183 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3184 * we might be here in C_WF_REPORT_PARAMS which is transient.
3185 * we do not need to wait for the after state change work either. */
3186 rv2
= drbd_change_state(device
, CS_VERBOSE
, NS(role
, R_SECONDARY
));
3187 if (rv2
!= SS_SUCCESS
) {
3188 drbd_khelper(device
, "pri-lost-after-sb");
3190 drbd_warn(device
, "Successfully gave up primary role.\n");
3201 * drbd_asb_recover_2p - Recover after split-brain with two remaining primaries
3203 static int drbd_asb_recover_2p(struct drbd_peer_device
*peer_device
) __must_hold(local
)
3205 struct drbd_device
*device
= peer_device
->device
;
3207 enum drbd_after_sb_p after_sb_2p
;
3210 after_sb_2p
= rcu_dereference(peer_device
->connection
->net_conf
)->after_sb_2p
;
3212 switch (after_sb_2p
) {
3213 case ASB_DISCARD_YOUNGER_PRI
:
3214 case ASB_DISCARD_OLDER_PRI
:
3215 case ASB_DISCARD_LEAST_CHG
:
3216 case ASB_DISCARD_LOCAL
:
3217 case ASB_DISCARD_REMOTE
:
3219 case ASB_DISCARD_SECONDARY
:
3220 case ASB_DISCARD_ZERO_CHG
:
3221 drbd_err(device
, "Configuration error.\n");
3224 rv
= drbd_asb_recover_0p(peer_device
);
3226 case ASB_DISCONNECT
:
3228 case ASB_CALL_HELPER
:
3229 hg
= drbd_asb_recover_0p(peer_device
);
3231 enum drbd_state_rv rv2
;
3233 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3234 * we might be here in C_WF_REPORT_PARAMS which is transient.
3235 * we do not need to wait for the after state change work either. */
3236 rv2
= drbd_change_state(device
, CS_VERBOSE
, NS(role
, R_SECONDARY
));
3237 if (rv2
!= SS_SUCCESS
) {
3238 drbd_khelper(device
, "pri-lost-after-sb");
3240 drbd_warn(device
, "Successfully gave up primary role.\n");
3250 static void drbd_uuid_dump(struct drbd_device
*device
, char *text
, u64
*uuid
,
3251 u64 bits
, u64 flags
)
3254 drbd_info(device
, "%s uuid info vanished while I was looking!\n", text
);
3257 drbd_info(device
, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3259 (unsigned long long)uuid
[UI_CURRENT
],
3260 (unsigned long long)uuid
[UI_BITMAP
],
3261 (unsigned long long)uuid
[UI_HISTORY_START
],
3262 (unsigned long long)uuid
[UI_HISTORY_END
],
3263 (unsigned long long)bits
,
3264 (unsigned long long)flags
);
3268 100 after split brain try auto recover
3269 2 C_SYNC_SOURCE set BitMap
3270 1 C_SYNC_SOURCE use BitMap
3272 -1 C_SYNC_TARGET use BitMap
3273 -2 C_SYNC_TARGET set BitMap
3274 -100 after split brain, disconnect
3275 -1000 unrelated data
3276 -1091 requires proto 91
3277 -1096 requires proto 96
3280 static int drbd_uuid_compare(struct drbd_device
*const device
, enum drbd_role
const peer_role
, int *rule_nr
) __must_hold(local
)
3282 struct drbd_peer_device
*const peer_device
= first_peer_device(device
);
3283 struct drbd_connection
*const connection
= peer_device
? peer_device
->connection
: NULL
;
3287 self
= device
->ldev
->md
.uuid
[UI_CURRENT
] & ~((u64
)1);
3288 peer
= device
->p_uuid
[UI_CURRENT
] & ~((u64
)1);
3291 if (self
== UUID_JUST_CREATED
&& peer
== UUID_JUST_CREATED
)
3295 if ((self
== UUID_JUST_CREATED
|| self
== (u64
)0) &&
3296 peer
!= UUID_JUST_CREATED
)
3300 if (self
!= UUID_JUST_CREATED
&&
3301 (peer
== UUID_JUST_CREATED
|| peer
== (u64
)0))
3305 int rct
, dc
; /* roles at crash time */
3307 if (device
->p_uuid
[UI_BITMAP
] == (u64
)0 && device
->ldev
->md
.uuid
[UI_BITMAP
] != (u64
)0) {
3309 if (connection
->agreed_pro_version
< 91)
3312 if ((device
->ldev
->md
.uuid
[UI_BITMAP
] & ~((u64
)1)) == (device
->p_uuid
[UI_HISTORY_START
] & ~((u64
)1)) &&
3313 (device
->ldev
->md
.uuid
[UI_HISTORY_START
] & ~((u64
)1)) == (device
->p_uuid
[UI_HISTORY_START
+ 1] & ~((u64
)1))) {
3314 drbd_info(device
, "was SyncSource, missed the resync finished event, corrected myself:\n");
3315 drbd_uuid_move_history(device
);
3316 device
->ldev
->md
.uuid
[UI_HISTORY_START
] = device
->ldev
->md
.uuid
[UI_BITMAP
];
3317 device
->ldev
->md
.uuid
[UI_BITMAP
] = 0;
3319 drbd_uuid_dump(device
, "self", device
->ldev
->md
.uuid
,
3320 device
->state
.disk
>= D_NEGOTIATING
? drbd_bm_total_weight(device
) : 0, 0);
3323 drbd_info(device
, "was SyncSource (peer failed to write sync_uuid)\n");
3330 if (device
->ldev
->md
.uuid
[UI_BITMAP
] == (u64
)0 && device
->p_uuid
[UI_BITMAP
] != (u64
)0) {
3332 if (connection
->agreed_pro_version
< 91)
3335 if ((device
->ldev
->md
.uuid
[UI_HISTORY_START
] & ~((u64
)1)) == (device
->p_uuid
[UI_BITMAP
] & ~((u64
)1)) &&
3336 (device
->ldev
->md
.uuid
[UI_HISTORY_START
+ 1] & ~((u64
)1)) == (device
->p_uuid
[UI_HISTORY_START
] & ~((u64
)1))) {
3337 drbd_info(device
, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3339 device
->p_uuid
[UI_HISTORY_START
+ 1] = device
->p_uuid
[UI_HISTORY_START
];
3340 device
->p_uuid
[UI_HISTORY_START
] = device
->p_uuid
[UI_BITMAP
];
3341 device
->p_uuid
[UI_BITMAP
] = 0UL;
3343 drbd_uuid_dump(device
, "peer", device
->p_uuid
, device
->p_uuid
[UI_SIZE
], device
->p_uuid
[UI_FLAGS
]);
3346 drbd_info(device
, "was SyncTarget (failed to write sync_uuid)\n");
3353 /* Common power [off|failure] */
3354 rct
= (test_bit(CRASHED_PRIMARY
, &device
->flags
) ? 1 : 0) +
3355 (device
->p_uuid
[UI_FLAGS
] & 2);
3356 /* lowest bit is set when we were primary,
3357 * next bit (weight 2) is set when peer was primary */
3360 /* Neither has the "crashed primary" flag set,
3361 * only a replication link hickup. */
3365 /* Current UUID equal and no bitmap uuid; does not necessarily
3366 * mean this was a "simultaneous hard crash", maybe IO was
3367 * frozen, so no UUID-bump happened.
3368 * This is a protocol change, overload DRBD_FF_WSAME as flag
3369 * for "new-enough" peer DRBD version. */
3370 if (device
->state
.role
== R_PRIMARY
|| peer_role
== R_PRIMARY
) {
3372 if (!(connection
->agreed_features
& DRBD_FF_WSAME
)) {
3373 drbd_warn(peer_device
, "Equivalent unrotated UUIDs, but current primary present.\n");
3374 return -(0x10000 | PRO_VERSION_MAX
| (DRBD_FF_WSAME
<< 8));
3376 if (device
->state
.role
== R_PRIMARY
&& peer_role
== R_PRIMARY
) {
3377 /* At least one has the "crashed primary" bit set,
3378 * both are primary now, but neither has rotated its UUIDs?
3379 * "Can not happen." */
3380 drbd_err(peer_device
, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3383 if (device
->state
.role
== R_PRIMARY
)
3388 /* Both are secondary.
3389 * Really looks like recovery from simultaneous hard crash.
3390 * Check which had been primary before, and arbitrate. */
3392 case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3393 case 1: /* self_pri && !peer_pri */ return 1;
3394 case 2: /* !self_pri && peer_pri */ return -1;
3395 case 3: /* self_pri && peer_pri */
3396 dc
= test_bit(RESOLVE_CONFLICTS
, &connection
->flags
);
3402 peer
= device
->p_uuid
[UI_BITMAP
] & ~((u64
)1);
3407 peer
= device
->p_uuid
[UI_HISTORY_START
] & ~((u64
)1);
3409 if (connection
->agreed_pro_version
< 96 ?
3410 (device
->ldev
->md
.uuid
[UI_HISTORY_START
] & ~((u64
)1)) ==
3411 (device
->p_uuid
[UI_HISTORY_START
+ 1] & ~((u64
)1)) :
3412 peer
+ UUID_NEW_BM_OFFSET
== (device
->p_uuid
[UI_BITMAP
] & ~((u64
)1))) {
3413 /* The last P_SYNC_UUID did not get though. Undo the last start of
3414 resync as sync source modifications of the peer's UUIDs. */
3416 if (connection
->agreed_pro_version
< 91)
3419 device
->p_uuid
[UI_BITMAP
] = device
->p_uuid
[UI_HISTORY_START
];
3420 device
->p_uuid
[UI_HISTORY_START
] = device
->p_uuid
[UI_HISTORY_START
+ 1];
3422 drbd_info(device
, "Lost last syncUUID packet, corrected:\n");
3423 drbd_uuid_dump(device
, "peer", device
->p_uuid
, device
->p_uuid
[UI_SIZE
], device
->p_uuid
[UI_FLAGS
]);
3430 self
= device
->ldev
->md
.uuid
[UI_CURRENT
] & ~((u64
)1);
3431 for (i
= UI_HISTORY_START
; i
<= UI_HISTORY_END
; i
++) {
3432 peer
= device
->p_uuid
[i
] & ~((u64
)1);
3438 self
= device
->ldev
->md
.uuid
[UI_BITMAP
] & ~((u64
)1);
3439 peer
= device
->p_uuid
[UI_CURRENT
] & ~((u64
)1);
3444 self
= device
->ldev
->md
.uuid
[UI_HISTORY_START
] & ~((u64
)1);
3446 if (connection
->agreed_pro_version
< 96 ?
3447 (device
->ldev
->md
.uuid
[UI_HISTORY_START
+ 1] & ~((u64
)1)) ==
3448 (device
->p_uuid
[UI_HISTORY_START
] & ~((u64
)1)) :
3449 self
+ UUID_NEW_BM_OFFSET
== (device
->ldev
->md
.uuid
[UI_BITMAP
] & ~((u64
)1))) {
3450 /* The last P_SYNC_UUID did not get though. Undo the last start of
3451 resync as sync source modifications of our UUIDs. */
3453 if (connection
->agreed_pro_version
< 91)
3456 __drbd_uuid_set(device
, UI_BITMAP
, device
->ldev
->md
.uuid
[UI_HISTORY_START
]);
3457 __drbd_uuid_set(device
, UI_HISTORY_START
, device
->ldev
->md
.uuid
[UI_HISTORY_START
+ 1]);
3459 drbd_info(device
, "Last syncUUID did not get through, corrected:\n");
3460 drbd_uuid_dump(device
, "self", device
->ldev
->md
.uuid
,
3461 device
->state
.disk
>= D_NEGOTIATING
? drbd_bm_total_weight(device
) : 0, 0);
3469 peer
= device
->p_uuid
[UI_CURRENT
] & ~((u64
)1);
3470 for (i
= UI_HISTORY_START
; i
<= UI_HISTORY_END
; i
++) {
3471 self
= device
->ldev
->md
.uuid
[i
] & ~((u64
)1);
3477 self
= device
->ldev
->md
.uuid
[UI_BITMAP
] & ~((u64
)1);
3478 peer
= device
->p_uuid
[UI_BITMAP
] & ~((u64
)1);
3479 if (self
== peer
&& self
!= ((u64
)0))
3483 for (i
= UI_HISTORY_START
; i
<= UI_HISTORY_END
; i
++) {
3484 self
= device
->ldev
->md
.uuid
[i
] & ~((u64
)1);
3485 for (j
= UI_HISTORY_START
; j
<= UI_HISTORY_END
; j
++) {
3486 peer
= device
->p_uuid
[j
] & ~((u64
)1);
3495 /* drbd_sync_handshake() returns the new conn state on success, or
3496 CONN_MASK (-1) on failure.
3498 static enum drbd_conns
drbd_sync_handshake(struct drbd_peer_device
*peer_device
,
3499 enum drbd_role peer_role
,
3500 enum drbd_disk_state peer_disk
) __must_hold(local
)
3502 struct drbd_device
*device
= peer_device
->device
;
3503 enum drbd_conns rv
= C_MASK
;
3504 enum drbd_disk_state mydisk
;
3505 struct net_conf
*nc
;
3506 int hg
, rule_nr
, rr_conflict
, tentative
, always_asbp
;
3508 mydisk
= device
->state
.disk
;
3509 if (mydisk
== D_NEGOTIATING
)
3510 mydisk
= device
->new_state_tmp
.disk
;
3512 drbd_info(device
, "drbd_sync_handshake:\n");
3514 spin_lock_irq(&device
->ldev
->md
.uuid_lock
);
3515 drbd_uuid_dump(device
, "self", device
->ldev
->md
.uuid
, device
->comm_bm_set
, 0);
3516 drbd_uuid_dump(device
, "peer", device
->p_uuid
,
3517 device
->p_uuid
[UI_SIZE
], device
->p_uuid
[UI_FLAGS
]);
3519 hg
= drbd_uuid_compare(device
, peer_role
, &rule_nr
);
3520 spin_unlock_irq(&device
->ldev
->md
.uuid_lock
);
3522 drbd_info(device
, "uuid_compare()=%d by rule %d\n", hg
, rule_nr
);
3525 drbd_alert(device
, "Unrelated data, aborting!\n");
3528 if (hg
< -0x10000) {
3532 fflags
= (hg
>> 8) & 0xff;
3533 drbd_alert(device
, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3538 drbd_alert(device
, "To resolve this both sides have to support at least protocol %d\n", -hg
- 1000);
3542 if ((mydisk
== D_INCONSISTENT
&& peer_disk
> D_INCONSISTENT
) ||
3543 (peer_disk
== D_INCONSISTENT
&& mydisk
> D_INCONSISTENT
)) {
3544 int f
= (hg
== -100) || abs(hg
) == 2;
3545 hg
= mydisk
> D_INCONSISTENT
? 1 : -1;
3548 drbd_info(device
, "Becoming sync %s due to disk states.\n",
3549 hg
> 0 ? "source" : "target");
3553 drbd_khelper(device
, "initial-split-brain");
3556 nc
= rcu_dereference(peer_device
->connection
->net_conf
);
3557 always_asbp
= nc
->always_asbp
;
3558 rr_conflict
= nc
->rr_conflict
;
3559 tentative
= nc
->tentative
;
3562 if (hg
== 100 || (hg
== -100 && always_asbp
)) {
3563 int pcount
= (device
->state
.role
== R_PRIMARY
)
3564 + (peer_role
== R_PRIMARY
);
3565 int forced
= (hg
== -100);
3569 hg
= drbd_asb_recover_0p(peer_device
);
3572 hg
= drbd_asb_recover_1p(peer_device
);
3575 hg
= drbd_asb_recover_2p(peer_device
);
3578 if (abs(hg
) < 100) {
3579 drbd_warn(device
, "Split-Brain detected, %d primaries, "
3580 "automatically solved. Sync from %s node\n",
3581 pcount
, (hg
< 0) ? "peer" : "this");
3583 drbd_warn(device
, "Doing a full sync, since"
3584 " UUIDs where ambiguous.\n");
3591 if (test_bit(DISCARD_MY_DATA
, &device
->flags
) && !(device
->p_uuid
[UI_FLAGS
]&1))
3593 if (!test_bit(DISCARD_MY_DATA
, &device
->flags
) && (device
->p_uuid
[UI_FLAGS
]&1))
3597 drbd_warn(device
, "Split-Brain detected, manually solved. "
3598 "Sync from %s node\n",
3599 (hg
< 0) ? "peer" : "this");
3603 /* FIXME this log message is not correct if we end up here
3604 * after an attempted attach on a diskless node.
3605 * We just refuse to attach -- well, we drop the "connection"
3606 * to that disk, in a way... */
3607 drbd_alert(device
, "Split-Brain detected but unresolved, dropping connection!\n");
3608 drbd_khelper(device
, "split-brain");
3612 if (hg
> 0 && mydisk
<= D_INCONSISTENT
) {
3613 drbd_err(device
, "I shall become SyncSource, but I am inconsistent!\n");
3617 if (hg
< 0 && /* by intention we do not use mydisk here. */
3618 device
->state
.role
== R_PRIMARY
&& device
->state
.disk
>= D_CONSISTENT
) {
3619 switch (rr_conflict
) {
3620 case ASB_CALL_HELPER
:
3621 drbd_khelper(device
, "pri-lost");
3623 case ASB_DISCONNECT
:
3624 drbd_err(device
, "I shall become SyncTarget, but I am primary!\n");
3627 drbd_warn(device
, "Becoming SyncTarget, violating the stable-data"
3632 if (tentative
|| test_bit(CONN_DRY_RUN
, &peer_device
->connection
->flags
)) {
3634 drbd_info(device
, "dry-run connect: No resync, would become Connected immediately.\n");
3636 drbd_info(device
, "dry-run connect: Would become %s, doing a %s resync.",
3637 drbd_conn_str(hg
> 0 ? C_SYNC_SOURCE
: C_SYNC_TARGET
),
3638 abs(hg
) >= 2 ? "full" : "bit-map based");
3643 drbd_info(device
, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3644 if (drbd_bitmap_io(device
, &drbd_bmio_set_n_write
, "set_n_write from sync_handshake",
3645 BM_LOCKED_SET_ALLOWED
))
3649 if (hg
> 0) { /* become sync source. */
3651 } else if (hg
< 0) { /* become sync target */
3655 if (drbd_bm_total_weight(device
)) {
3656 drbd_info(device
, "No resync, but %lu bits in bitmap!\n",
3657 drbd_bm_total_weight(device
));
3664 static enum drbd_after_sb_p
convert_after_sb(enum drbd_after_sb_p peer
)
3666 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3667 if (peer
== ASB_DISCARD_REMOTE
)
3668 return ASB_DISCARD_LOCAL
;
3670 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3671 if (peer
== ASB_DISCARD_LOCAL
)
3672 return ASB_DISCARD_REMOTE
;
3674 /* everything else is valid if they are equal on both sides. */
3678 static int receive_protocol(struct drbd_connection
*connection
, struct packet_info
*pi
)
3680 struct p_protocol
*p
= pi
->data
;
3681 enum drbd_after_sb_p p_after_sb_0p
, p_after_sb_1p
, p_after_sb_2p
;
3682 int p_proto
, p_discard_my_data
, p_two_primaries
, cf
;
3683 struct net_conf
*nc
, *old_net_conf
, *new_net_conf
= NULL
;
3684 char integrity_alg
[SHARED_SECRET_MAX
] = "";
3685 struct crypto_shash
*peer_integrity_tfm
= NULL
;
3686 void *int_dig_in
= NULL
, *int_dig_vv
= NULL
;
3688 p_proto
= be32_to_cpu(p
->protocol
);
3689 p_after_sb_0p
= be32_to_cpu(p
->after_sb_0p
);
3690 p_after_sb_1p
= be32_to_cpu(p
->after_sb_1p
);
3691 p_after_sb_2p
= be32_to_cpu(p
->after_sb_2p
);
3692 p_two_primaries
= be32_to_cpu(p
->two_primaries
);
3693 cf
= be32_to_cpu(p
->conn_flags
);
3694 p_discard_my_data
= cf
& CF_DISCARD_MY_DATA
;
3696 if (connection
->agreed_pro_version
>= 87) {
3699 if (pi
->size
> sizeof(integrity_alg
))
3701 err
= drbd_recv_all(connection
, integrity_alg
, pi
->size
);
3704 integrity_alg
[SHARED_SECRET_MAX
- 1] = 0;
3707 if (pi
->cmd
!= P_PROTOCOL_UPDATE
) {
3708 clear_bit(CONN_DRY_RUN
, &connection
->flags
);
3710 if (cf
& CF_DRY_RUN
)
3711 set_bit(CONN_DRY_RUN
, &connection
->flags
);
3714 nc
= rcu_dereference(connection
->net_conf
);
3716 if (p_proto
!= nc
->wire_protocol
) {
3717 drbd_err(connection
, "incompatible %s settings\n", "protocol");
3718 goto disconnect_rcu_unlock
;
3721 if (convert_after_sb(p_after_sb_0p
) != nc
->after_sb_0p
) {
3722 drbd_err(connection
, "incompatible %s settings\n", "after-sb-0pri");
3723 goto disconnect_rcu_unlock
;
3726 if (convert_after_sb(p_after_sb_1p
) != nc
->after_sb_1p
) {
3727 drbd_err(connection
, "incompatible %s settings\n", "after-sb-1pri");
3728 goto disconnect_rcu_unlock
;
3731 if (convert_after_sb(p_after_sb_2p
) != nc
->after_sb_2p
) {
3732 drbd_err(connection
, "incompatible %s settings\n", "after-sb-2pri");
3733 goto disconnect_rcu_unlock
;
3736 if (p_discard_my_data
&& nc
->discard_my_data
) {
3737 drbd_err(connection
, "incompatible %s settings\n", "discard-my-data");
3738 goto disconnect_rcu_unlock
;
3741 if (p_two_primaries
!= nc
->two_primaries
) {
3742 drbd_err(connection
, "incompatible %s settings\n", "allow-two-primaries");
3743 goto disconnect_rcu_unlock
;
3746 if (strcmp(integrity_alg
, nc
->integrity_alg
)) {
3747 drbd_err(connection
, "incompatible %s settings\n", "data-integrity-alg");
3748 goto disconnect_rcu_unlock
;
3754 if (integrity_alg
[0]) {
3758 * We can only change the peer data integrity algorithm
3759 * here. Changing our own data integrity algorithm
3760 * requires that we send a P_PROTOCOL_UPDATE packet at
3761 * the same time; otherwise, the peer has no way to
3762 * tell between which packets the algorithm should
3766 peer_integrity_tfm
= crypto_alloc_shash(integrity_alg
, 0, 0);
3767 if (IS_ERR(peer_integrity_tfm
)) {
3768 peer_integrity_tfm
= NULL
;
3769 drbd_err(connection
, "peer data-integrity-alg %s not supported\n",
3774 hash_size
= crypto_shash_digestsize(peer_integrity_tfm
);
3775 int_dig_in
= kmalloc(hash_size
, GFP_KERNEL
);
3776 int_dig_vv
= kmalloc(hash_size
, GFP_KERNEL
);
3777 if (!(int_dig_in
&& int_dig_vv
)) {
3778 drbd_err(connection
, "Allocation of buffers for data integrity checking failed\n");
3783 new_net_conf
= kmalloc(sizeof(struct net_conf
), GFP_KERNEL
);
3784 if (!new_net_conf
) {
3785 drbd_err(connection
, "Allocation of new net_conf failed\n");
3789 mutex_lock(&connection
->data
.mutex
);
3790 mutex_lock(&connection
->resource
->conf_update
);
3791 old_net_conf
= connection
->net_conf
;
3792 *new_net_conf
= *old_net_conf
;
3794 new_net_conf
->wire_protocol
= p_proto
;
3795 new_net_conf
->after_sb_0p
= convert_after_sb(p_after_sb_0p
);
3796 new_net_conf
->after_sb_1p
= convert_after_sb(p_after_sb_1p
);
3797 new_net_conf
->after_sb_2p
= convert_after_sb(p_after_sb_2p
);
3798 new_net_conf
->two_primaries
= p_two_primaries
;
3800 rcu_assign_pointer(connection
->net_conf
, new_net_conf
);
3801 mutex_unlock(&connection
->resource
->conf_update
);
3802 mutex_unlock(&connection
->data
.mutex
);
3804 crypto_free_shash(connection
->peer_integrity_tfm
);
3805 kfree(connection
->int_dig_in
);
3806 kfree(connection
->int_dig_vv
);
3807 connection
->peer_integrity_tfm
= peer_integrity_tfm
;
3808 connection
->int_dig_in
= int_dig_in
;
3809 connection
->int_dig_vv
= int_dig_vv
;
3811 if (strcmp(old_net_conf
->integrity_alg
, integrity_alg
))
3812 drbd_info(connection
, "peer data-integrity-alg: %s\n",
3813 integrity_alg
[0] ? integrity_alg
: "(none)");
3816 kfree(old_net_conf
);
3819 disconnect_rcu_unlock
:
3822 crypto_free_shash(peer_integrity_tfm
);
3825 conn_request_state(connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
3830 * input: alg name, feature name
3831 * return: NULL (alg name was "")
3832 * ERR_PTR(error) if something goes wrong
3833 * or the crypto hash ptr, if it worked out ok. */
3834 static struct crypto_shash
*drbd_crypto_alloc_digest_safe(
3835 const struct drbd_device
*device
,
3836 const char *alg
, const char *name
)
3838 struct crypto_shash
*tfm
;
3843 tfm
= crypto_alloc_shash(alg
, 0, 0);
3845 drbd_err(device
, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3846 alg
, name
, PTR_ERR(tfm
));
3852 static int ignore_remaining_packet(struct drbd_connection
*connection
, struct packet_info
*pi
)
3854 void *buffer
= connection
->data
.rbuf
;
3855 int size
= pi
->size
;
3858 int s
= min_t(int, size
, DRBD_SOCKET_BUFFER_SIZE
);
3859 s
= drbd_recv(connection
, buffer
, s
);
3873 * config_unknown_volume - device configuration command for unknown volume
3875 * When a device is added to an existing connection, the node on which the
3876 * device is added first will send configuration commands to its peer but the
3877 * peer will not know about the device yet. It will warn and ignore these
3878 * commands. Once the device is added on the second node, the second node will
3879 * send the same device configuration commands, but in the other direction.
3881 * (We can also end up here if drbd is misconfigured.)
3883 static int config_unknown_volume(struct drbd_connection
*connection
, struct packet_info
*pi
)
3885 drbd_warn(connection
, "%s packet received for volume %u, which is not configured locally\n",
3886 cmdname(pi
->cmd
), pi
->vnr
);
3887 return ignore_remaining_packet(connection
, pi
);
3890 static int receive_SyncParam(struct drbd_connection
*connection
, struct packet_info
*pi
)
3892 struct drbd_peer_device
*peer_device
;
3893 struct drbd_device
*device
;
3894 struct p_rs_param_95
*p
;
3895 unsigned int header_size
, data_size
, exp_max_sz
;
3896 struct crypto_shash
*verify_tfm
= NULL
;
3897 struct crypto_shash
*csums_tfm
= NULL
;
3898 struct net_conf
*old_net_conf
, *new_net_conf
= NULL
;
3899 struct disk_conf
*old_disk_conf
= NULL
, *new_disk_conf
= NULL
;
3900 const int apv
= connection
->agreed_pro_version
;
3901 struct fifo_buffer
*old_plan
= NULL
, *new_plan
= NULL
;
3905 peer_device
= conn_peer_device(connection
, pi
->vnr
);
3907 return config_unknown_volume(connection
, pi
);
3908 device
= peer_device
->device
;
3910 exp_max_sz
= apv
<= 87 ? sizeof(struct p_rs_param
)
3911 : apv
== 88 ? sizeof(struct p_rs_param
)
3913 : apv
<= 94 ? sizeof(struct p_rs_param_89
)
3914 : /* apv >= 95 */ sizeof(struct p_rs_param_95
);
3916 if (pi
->size
> exp_max_sz
) {
3917 drbd_err(device
, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3918 pi
->size
, exp_max_sz
);
3923 header_size
= sizeof(struct p_rs_param
);
3924 data_size
= pi
->size
- header_size
;
3925 } else if (apv
<= 94) {
3926 header_size
= sizeof(struct p_rs_param_89
);
3927 data_size
= pi
->size
- header_size
;
3928 D_ASSERT(device
, data_size
== 0);
3930 header_size
= sizeof(struct p_rs_param_95
);
3931 data_size
= pi
->size
- header_size
;
3932 D_ASSERT(device
, data_size
== 0);
3935 /* initialize verify_alg and csums_alg */
3937 memset(p
->verify_alg
, 0, 2 * SHARED_SECRET_MAX
);
3939 err
= drbd_recv_all(peer_device
->connection
, p
, header_size
);
3943 mutex_lock(&connection
->resource
->conf_update
);
3944 old_net_conf
= peer_device
->connection
->net_conf
;
3945 if (get_ldev(device
)) {
3946 new_disk_conf
= kzalloc(sizeof(struct disk_conf
), GFP_KERNEL
);
3947 if (!new_disk_conf
) {
3949 mutex_unlock(&connection
->resource
->conf_update
);
3950 drbd_err(device
, "Allocation of new disk_conf failed\n");
3954 old_disk_conf
= device
->ldev
->disk_conf
;
3955 *new_disk_conf
= *old_disk_conf
;
3957 new_disk_conf
->resync_rate
= be32_to_cpu(p
->resync_rate
);
3962 if (data_size
> SHARED_SECRET_MAX
|| data_size
== 0) {
3963 drbd_err(device
, "verify-alg of wrong size, "
3964 "peer wants %u, accepting only up to %u byte\n",
3965 data_size
, SHARED_SECRET_MAX
);
3970 err
= drbd_recv_all(peer_device
->connection
, p
->verify_alg
, data_size
);
3973 /* we expect NUL terminated string */
3974 /* but just in case someone tries to be evil */
3975 D_ASSERT(device
, p
->verify_alg
[data_size
-1] == 0);
3976 p
->verify_alg
[data_size
-1] = 0;
3978 } else /* apv >= 89 */ {
3979 /* we still expect NUL terminated strings */
3980 /* but just in case someone tries to be evil */
3981 D_ASSERT(device
, p
->verify_alg
[SHARED_SECRET_MAX
-1] == 0);
3982 D_ASSERT(device
, p
->csums_alg
[SHARED_SECRET_MAX
-1] == 0);
3983 p
->verify_alg
[SHARED_SECRET_MAX
-1] = 0;
3984 p
->csums_alg
[SHARED_SECRET_MAX
-1] = 0;
3987 if (strcmp(old_net_conf
->verify_alg
, p
->verify_alg
)) {
3988 if (device
->state
.conn
== C_WF_REPORT_PARAMS
) {
3989 drbd_err(device
, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3990 old_net_conf
->verify_alg
, p
->verify_alg
);
3993 verify_tfm
= drbd_crypto_alloc_digest_safe(device
,
3994 p
->verify_alg
, "verify-alg");
3995 if (IS_ERR(verify_tfm
)) {
4001 if (apv
>= 89 && strcmp(old_net_conf
->csums_alg
, p
->csums_alg
)) {
4002 if (device
->state
.conn
== C_WF_REPORT_PARAMS
) {
4003 drbd_err(device
, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
4004 old_net_conf
->csums_alg
, p
->csums_alg
);
4007 csums_tfm
= drbd_crypto_alloc_digest_safe(device
,
4008 p
->csums_alg
, "csums-alg");
4009 if (IS_ERR(csums_tfm
)) {
4015 if (apv
> 94 && new_disk_conf
) {
4016 new_disk_conf
->c_plan_ahead
= be32_to_cpu(p
->c_plan_ahead
);
4017 new_disk_conf
->c_delay_target
= be32_to_cpu(p
->c_delay_target
);
4018 new_disk_conf
->c_fill_target
= be32_to_cpu(p
->c_fill_target
);
4019 new_disk_conf
->c_max_rate
= be32_to_cpu(p
->c_max_rate
);
4021 fifo_size
= (new_disk_conf
->c_plan_ahead
* 10 * SLEEP_TIME
) / HZ
;
4022 if (fifo_size
!= device
->rs_plan_s
->size
) {
4023 new_plan
= fifo_alloc(fifo_size
);
4025 drbd_err(device
, "kmalloc of fifo_buffer failed");
4032 if (verify_tfm
|| csums_tfm
) {
4033 new_net_conf
= kzalloc(sizeof(struct net_conf
), GFP_KERNEL
);
4034 if (!new_net_conf
) {
4035 drbd_err(device
, "Allocation of new net_conf failed\n");
4039 *new_net_conf
= *old_net_conf
;
4042 strcpy(new_net_conf
->verify_alg
, p
->verify_alg
);
4043 new_net_conf
->verify_alg_len
= strlen(p
->verify_alg
) + 1;
4044 crypto_free_shash(peer_device
->connection
->verify_tfm
);
4045 peer_device
->connection
->verify_tfm
= verify_tfm
;
4046 drbd_info(device
, "using verify-alg: \"%s\"\n", p
->verify_alg
);
4049 strcpy(new_net_conf
->csums_alg
, p
->csums_alg
);
4050 new_net_conf
->csums_alg_len
= strlen(p
->csums_alg
) + 1;
4051 crypto_free_shash(peer_device
->connection
->csums_tfm
);
4052 peer_device
->connection
->csums_tfm
= csums_tfm
;
4053 drbd_info(device
, "using csums-alg: \"%s\"\n", p
->csums_alg
);
4055 rcu_assign_pointer(connection
->net_conf
, new_net_conf
);
4059 if (new_disk_conf
) {
4060 rcu_assign_pointer(device
->ldev
->disk_conf
, new_disk_conf
);
4065 old_plan
= device
->rs_plan_s
;
4066 rcu_assign_pointer(device
->rs_plan_s
, new_plan
);
4069 mutex_unlock(&connection
->resource
->conf_update
);
4072 kfree(old_net_conf
);
4073 kfree(old_disk_conf
);
4079 if (new_disk_conf
) {
4081 kfree(new_disk_conf
);
4083 mutex_unlock(&connection
->resource
->conf_update
);
4088 if (new_disk_conf
) {
4090 kfree(new_disk_conf
);
4092 mutex_unlock(&connection
->resource
->conf_update
);
4093 /* just for completeness: actually not needed,
4094 * as this is not reached if csums_tfm was ok. */
4095 crypto_free_shash(csums_tfm
);
4096 /* but free the verify_tfm again, if csums_tfm did not work out */
4097 crypto_free_shash(verify_tfm
);
4098 conn_request_state(peer_device
->connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
4102 /* warn if the arguments differ by more than 12.5% */
4103 static void warn_if_differ_considerably(struct drbd_device
*device
,
4104 const char *s
, sector_t a
, sector_t b
)
4107 if (a
== 0 || b
== 0)
4109 d
= (a
> b
) ? (a
- b
) : (b
- a
);
4110 if (d
> (a
>>3) || d
> (b
>>3))
4111 drbd_warn(device
, "Considerable difference in %s: %llus vs. %llus\n", s
,
4112 (unsigned long long)a
, (unsigned long long)b
);
4115 static int receive_sizes(struct drbd_connection
*connection
, struct packet_info
*pi
)
4117 struct drbd_peer_device
*peer_device
;
4118 struct drbd_device
*device
;
4119 struct p_sizes
*p
= pi
->data
;
4120 struct o_qlim
*o
= (connection
->agreed_features
& DRBD_FF_WSAME
) ? p
->qlim
: NULL
;
4121 enum determine_dev_size dd
= DS_UNCHANGED
;
4122 sector_t p_size
, p_usize
, p_csize
, my_usize
;
4123 sector_t new_size
, cur_size
;
4124 int ldsc
= 0; /* local disk size changed */
4125 enum dds_flags ddsf
;
4127 peer_device
= conn_peer_device(connection
, pi
->vnr
);
4129 return config_unknown_volume(connection
, pi
);
4130 device
= peer_device
->device
;
4131 cur_size
= drbd_get_capacity(device
->this_bdev
);
4133 p_size
= be64_to_cpu(p
->d_size
);
4134 p_usize
= be64_to_cpu(p
->u_size
);
4135 p_csize
= be64_to_cpu(p
->c_size
);
4137 /* just store the peer's disk size for now.
4138 * we still need to figure out whether we accept that. */
4139 device
->p_size
= p_size
;
4141 if (get_ldev(device
)) {
4143 my_usize
= rcu_dereference(device
->ldev
->disk_conf
)->disk_size
;
4146 warn_if_differ_considerably(device
, "lower level device sizes",
4147 p_size
, drbd_get_max_capacity(device
->ldev
));
4148 warn_if_differ_considerably(device
, "user requested size",
4151 /* if this is the first connect, or an otherwise expected
4152 * param exchange, choose the minimum */
4153 if (device
->state
.conn
== C_WF_REPORT_PARAMS
)
4154 p_usize
= min_not_zero(my_usize
, p_usize
);
4156 /* Never shrink a device with usable data during connect,
4157 * or "attach" on the peer.
4158 * But allow online shrinking if we are connected. */
4159 new_size
= drbd_new_dev_size(device
, device
->ldev
, p_usize
, 0);
4160 if (new_size
< cur_size
&&
4161 device
->state
.disk
>= D_OUTDATED
&&
4162 (device
->state
.conn
< C_CONNECTED
|| device
->state
.pdsk
== D_DISKLESS
)) {
4163 drbd_err(device
, "The peer's disk size is too small! (%llu < %llu sectors)\n",
4164 (unsigned long long)new_size
, (unsigned long long)cur_size
);
4165 conn_request_state(peer_device
->connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
4170 if (my_usize
!= p_usize
) {
4171 struct disk_conf
*old_disk_conf
, *new_disk_conf
= NULL
;
4173 new_disk_conf
= kzalloc(sizeof(struct disk_conf
), GFP_KERNEL
);
4174 if (!new_disk_conf
) {
4175 drbd_err(device
, "Allocation of new disk_conf failed\n");
4180 mutex_lock(&connection
->resource
->conf_update
);
4181 old_disk_conf
= device
->ldev
->disk_conf
;
4182 *new_disk_conf
= *old_disk_conf
;
4183 new_disk_conf
->disk_size
= p_usize
;
4185 rcu_assign_pointer(device
->ldev
->disk_conf
, new_disk_conf
);
4186 mutex_unlock(&connection
->resource
->conf_update
);
4188 kfree(old_disk_conf
);
4190 drbd_info(device
, "Peer sets u_size to %lu sectors (old: %lu)\n",
4191 (unsigned long)p_usize
, (unsigned long)my_usize
);
4197 device
->peer_max_bio_size
= be32_to_cpu(p
->max_bio_size
);
4198 /* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4199 In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4200 drbd_reconsider_queue_parameters(), we can be sure that after
4201 drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4203 ddsf
= be16_to_cpu(p
->dds_flags
);
4204 if (get_ldev(device
)) {
4205 drbd_reconsider_queue_parameters(device
, device
->ldev
, o
);
4206 dd
= drbd_determine_dev_size(device
, ddsf
, NULL
);
4210 drbd_md_sync(device
);
4213 * I am diskless, need to accept the peer's *current* size.
4214 * I must NOT accept the peers backing disk size,
4215 * it may have been larger than mine all along...
4217 * At this point, the peer knows more about my disk, or at
4218 * least about what we last agreed upon, than myself.
4219 * So if his c_size is less than his d_size, the most likely
4220 * reason is that *my* d_size was smaller last time we checked.
4222 * However, if he sends a zero current size,
4223 * take his (user-capped or) backing disk size anyways.
4225 * Unless of course he does not have a disk himself.
4226 * In which case we ignore this completely.
4228 sector_t new_size
= p_csize
?: p_usize
?: p_size
;
4229 drbd_reconsider_queue_parameters(device
, NULL
, o
);
4230 if (new_size
== 0) {
4231 /* Ignore, peer does not know nothing. */
4232 } else if (new_size
== cur_size
) {
4234 } else if (cur_size
!= 0 && p_size
== 0) {
4235 drbd_warn(device
, "Ignored diskless peer device size (peer:%llu != me:%llu sectors)!\n",
4236 (unsigned long long)new_size
, (unsigned long long)cur_size
);
4237 } else if (new_size
< cur_size
&& device
->state
.role
== R_PRIMARY
) {
4238 drbd_err(device
, "The peer's device size is too small! (%llu < %llu sectors); demote me first!\n",
4239 (unsigned long long)new_size
, (unsigned long long)cur_size
);
4240 conn_request_state(peer_device
->connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
4243 /* I believe the peer, if
4244 * - I don't have a current size myself
4245 * - we agree on the size anyways
4246 * - I do have a current size, am Secondary,
4247 * and he has the only disk
4248 * - I do have a current size, am Primary,
4249 * and he has the only disk,
4250 * which is larger than my current size
4252 drbd_set_my_capacity(device
, new_size
);
4256 if (get_ldev(device
)) {
4257 if (device
->ldev
->known_size
!= drbd_get_capacity(device
->ldev
->backing_bdev
)) {
4258 device
->ldev
->known_size
= drbd_get_capacity(device
->ldev
->backing_bdev
);
4265 if (device
->state
.conn
> C_WF_REPORT_PARAMS
) {
4266 if (be64_to_cpu(p
->c_size
) !=
4267 drbd_get_capacity(device
->this_bdev
) || ldsc
) {
4268 /* we have different sizes, probably peer
4269 * needs to know my new size... */
4270 drbd_send_sizes(peer_device
, 0, ddsf
);
4272 if (test_and_clear_bit(RESIZE_PENDING
, &device
->flags
) ||
4273 (dd
== DS_GREW
&& device
->state
.conn
== C_CONNECTED
)) {
4274 if (device
->state
.pdsk
>= D_INCONSISTENT
&&
4275 device
->state
.disk
>= D_INCONSISTENT
) {
4276 if (ddsf
& DDSF_NO_RESYNC
)
4277 drbd_info(device
, "Resync of new storage suppressed with --assume-clean\n");
4279 resync_after_online_grow(device
);
4281 set_bit(RESYNC_AFTER_NEG
, &device
->flags
);
4288 static int receive_uuids(struct drbd_connection
*connection
, struct packet_info
*pi
)
4290 struct drbd_peer_device
*peer_device
;
4291 struct drbd_device
*device
;
4292 struct p_uuids
*p
= pi
->data
;
4294 int i
, updated_uuids
= 0;
4296 peer_device
= conn_peer_device(connection
, pi
->vnr
);
4298 return config_unknown_volume(connection
, pi
);
4299 device
= peer_device
->device
;
4301 p_uuid
= kmalloc_array(UI_EXTENDED_SIZE
, sizeof(*p_uuid
), GFP_NOIO
);
4303 drbd_err(device
, "kmalloc of p_uuid failed\n");
4307 for (i
= UI_CURRENT
; i
< UI_EXTENDED_SIZE
; i
++)
4308 p_uuid
[i
] = be64_to_cpu(p
->uuid
[i
]);
4310 kfree(device
->p_uuid
);
4311 device
->p_uuid
= p_uuid
;
4313 if ((device
->state
.conn
< C_CONNECTED
|| device
->state
.pdsk
== D_DISKLESS
) &&
4314 device
->state
.disk
< D_INCONSISTENT
&&
4315 device
->state
.role
== R_PRIMARY
&&
4316 (device
->ed_uuid
& ~((u64
)1)) != (p_uuid
[UI_CURRENT
] & ~((u64
)1))) {
4317 drbd_err(device
, "Can only connect to data with current UUID=%016llX\n",
4318 (unsigned long long)device
->ed_uuid
);
4319 conn_request_state(peer_device
->connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
4323 if (get_ldev(device
)) {
4324 int skip_initial_sync
=
4325 device
->state
.conn
== C_CONNECTED
&&
4326 peer_device
->connection
->agreed_pro_version
>= 90 &&
4327 device
->ldev
->md
.uuid
[UI_CURRENT
] == UUID_JUST_CREATED
&&
4328 (p_uuid
[UI_FLAGS
] & 8);
4329 if (skip_initial_sync
) {
4330 drbd_info(device
, "Accepted new current UUID, preparing to skip initial sync\n");
4331 drbd_bitmap_io(device
, &drbd_bmio_clear_n_write
,
4332 "clear_n_write from receive_uuids",
4333 BM_LOCKED_TEST_ALLOWED
);
4334 _drbd_uuid_set(device
, UI_CURRENT
, p_uuid
[UI_CURRENT
]);
4335 _drbd_uuid_set(device
, UI_BITMAP
, 0);
4336 _drbd_set_state(_NS2(device
, disk
, D_UP_TO_DATE
, pdsk
, D_UP_TO_DATE
),
4338 drbd_md_sync(device
);
4342 } else if (device
->state
.disk
< D_INCONSISTENT
&&
4343 device
->state
.role
== R_PRIMARY
) {
4344 /* I am a diskless primary, the peer just created a new current UUID
4346 updated_uuids
= drbd_set_ed_uuid(device
, p_uuid
[UI_CURRENT
]);
4349 /* Before we test for the disk state, we should wait until an eventually
4350 ongoing cluster wide state change is finished. That is important if
4351 we are primary and are detaching from our disk. We need to see the
4352 new disk state... */
4353 mutex_lock(device
->state_mutex
);
4354 mutex_unlock(device
->state_mutex
);
4355 if (device
->state
.conn
>= C_CONNECTED
&& device
->state
.disk
< D_INCONSISTENT
)
4356 updated_uuids
|= drbd_set_ed_uuid(device
, p_uuid
[UI_CURRENT
]);
4359 drbd_print_uuids(device
, "receiver updated UUIDs to");
4365 * convert_state() - Converts the peer's view of the cluster state to our point of view
4366 * @ps: The state as seen by the peer.
4368 static union drbd_state
convert_state(union drbd_state ps
)
4370 union drbd_state ms
;
4372 static enum drbd_conns c_tab
[] = {
4373 [C_WF_REPORT_PARAMS
] = C_WF_REPORT_PARAMS
,
4374 [C_CONNECTED
] = C_CONNECTED
,
4376 [C_STARTING_SYNC_S
] = C_STARTING_SYNC_T
,
4377 [C_STARTING_SYNC_T
] = C_STARTING_SYNC_S
,
4378 [C_DISCONNECTING
] = C_TEAR_DOWN
, /* C_NETWORK_FAILURE, */
4379 [C_VERIFY_S
] = C_VERIFY_T
,
4385 ms
.conn
= c_tab
[ps
.conn
];
4390 ms
.peer_isp
= (ps
.aftr_isp
| ps
.user_isp
);
4395 static int receive_req_state(struct drbd_connection
*connection
, struct packet_info
*pi
)
4397 struct drbd_peer_device
*peer_device
;
4398 struct drbd_device
*device
;
4399 struct p_req_state
*p
= pi
->data
;
4400 union drbd_state mask
, val
;
4401 enum drbd_state_rv rv
;
4403 peer_device
= conn_peer_device(connection
, pi
->vnr
);
4406 device
= peer_device
->device
;
4408 mask
.i
= be32_to_cpu(p
->mask
);
4409 val
.i
= be32_to_cpu(p
->val
);
4411 if (test_bit(RESOLVE_CONFLICTS
, &peer_device
->connection
->flags
) &&
4412 mutex_is_locked(device
->state_mutex
)) {
4413 drbd_send_sr_reply(peer_device
, SS_CONCURRENT_ST_CHG
);
4417 mask
= convert_state(mask
);
4418 val
= convert_state(val
);
4420 rv
= drbd_change_state(device
, CS_VERBOSE
, mask
, val
);
4421 drbd_send_sr_reply(peer_device
, rv
);
4423 drbd_md_sync(device
);
4428 static int receive_req_conn_state(struct drbd_connection
*connection
, struct packet_info
*pi
)
4430 struct p_req_state
*p
= pi
->data
;
4431 union drbd_state mask
, val
;
4432 enum drbd_state_rv rv
;
4434 mask
.i
= be32_to_cpu(p
->mask
);
4435 val
.i
= be32_to_cpu(p
->val
);
4437 if (test_bit(RESOLVE_CONFLICTS
, &connection
->flags
) &&
4438 mutex_is_locked(&connection
->cstate_mutex
)) {
4439 conn_send_sr_reply(connection
, SS_CONCURRENT_ST_CHG
);
4443 mask
= convert_state(mask
);
4444 val
= convert_state(val
);
4446 rv
= conn_request_state(connection
, mask
, val
, CS_VERBOSE
| CS_LOCAL_ONLY
| CS_IGN_OUTD_FAIL
);
4447 conn_send_sr_reply(connection
, rv
);
4452 static int receive_state(struct drbd_connection
*connection
, struct packet_info
*pi
)
4454 struct drbd_peer_device
*peer_device
;
4455 struct drbd_device
*device
;
4456 struct p_state
*p
= pi
->data
;
4457 union drbd_state os
, ns
, peer_state
;
4458 enum drbd_disk_state real_peer_disk
;
4459 enum chg_state_flags cs_flags
;
4462 peer_device
= conn_peer_device(connection
, pi
->vnr
);
4464 return config_unknown_volume(connection
, pi
);
4465 device
= peer_device
->device
;
4467 peer_state
.i
= be32_to_cpu(p
->state
);
4469 real_peer_disk
= peer_state
.disk
;
4470 if (peer_state
.disk
== D_NEGOTIATING
) {
4471 real_peer_disk
= device
->p_uuid
[UI_FLAGS
] & 4 ? D_INCONSISTENT
: D_CONSISTENT
;
4472 drbd_info(device
, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk
));
4475 spin_lock_irq(&device
->resource
->req_lock
);
4477 os
= ns
= drbd_read_state(device
);
4478 spin_unlock_irq(&device
->resource
->req_lock
);
4480 /* If some other part of the code (ack_receiver thread, timeout)
4481 * already decided to close the connection again,
4482 * we must not "re-establish" it here. */
4483 if (os
.conn
<= C_TEAR_DOWN
)
4486 /* If this is the "end of sync" confirmation, usually the peer disk
4487 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4488 * set) resync started in PausedSyncT, or if the timing of pause-/
4489 * unpause-sync events has been "just right", the peer disk may
4490 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4492 if ((os
.pdsk
== D_INCONSISTENT
|| os
.pdsk
== D_CONSISTENT
) &&
4493 real_peer_disk
== D_UP_TO_DATE
&&
4494 os
.conn
> C_CONNECTED
&& os
.disk
== D_UP_TO_DATE
) {
4495 /* If we are (becoming) SyncSource, but peer is still in sync
4496 * preparation, ignore its uptodate-ness to avoid flapping, it
4497 * will change to inconsistent once the peer reaches active
4499 * It may have changed syncer-paused flags, however, so we
4500 * cannot ignore this completely. */
4501 if (peer_state
.conn
> C_CONNECTED
&&
4502 peer_state
.conn
< C_SYNC_SOURCE
)
4503 real_peer_disk
= D_INCONSISTENT
;
4505 /* if peer_state changes to connected at the same time,
4506 * it explicitly notifies us that it finished resync.
4507 * Maybe we should finish it up, too? */
4508 else if (os
.conn
>= C_SYNC_SOURCE
&&
4509 peer_state
.conn
== C_CONNECTED
) {
4510 if (drbd_bm_total_weight(device
) <= device
->rs_failed
)
4511 drbd_resync_finished(device
);
4516 /* explicit verify finished notification, stop sector reached. */
4517 if (os
.conn
== C_VERIFY_T
&& os
.disk
== D_UP_TO_DATE
&&
4518 peer_state
.conn
== C_CONNECTED
&& real_peer_disk
== D_UP_TO_DATE
) {
4519 ov_out_of_sync_print(device
);
4520 drbd_resync_finished(device
);
4524 /* peer says his disk is inconsistent, while we think it is uptodate,
4525 * and this happens while the peer still thinks we have a sync going on,
4526 * but we think we are already done with the sync.
4527 * We ignore this to avoid flapping pdsk.
4528 * This should not happen, if the peer is a recent version of drbd. */
4529 if (os
.pdsk
== D_UP_TO_DATE
&& real_peer_disk
== D_INCONSISTENT
&&
4530 os
.conn
== C_CONNECTED
&& peer_state
.conn
> C_SYNC_SOURCE
)
4531 real_peer_disk
= D_UP_TO_DATE
;
4533 if (ns
.conn
== C_WF_REPORT_PARAMS
)
4534 ns
.conn
= C_CONNECTED
;
4536 if (peer_state
.conn
== C_AHEAD
)
4540 * if (primary and diskless and peer uuid != effective uuid)
4541 * abort attach on peer;
4543 * If this node does not have good data, was already connected, but
4544 * the peer did a late attach only now, trying to "negotiate" with me,
4545 * AND I am currently Primary, possibly frozen, with some specific
4546 * "effective" uuid, this should never be reached, really, because
4547 * we first send the uuids, then the current state.
4549 * In this scenario, we already dropped the connection hard
4550 * when we received the unsuitable uuids (receive_uuids().
4552 * Should we want to change this, that is: not drop the connection in
4553 * receive_uuids() already, then we would need to add a branch here
4554 * that aborts the attach of "unsuitable uuids" on the peer in case
4555 * this node is currently Diskless Primary.
4558 if (device
->p_uuid
&& peer_state
.disk
>= D_NEGOTIATING
&&
4559 get_ldev_if_state(device
, D_NEGOTIATING
)) {
4560 int cr
; /* consider resync */
4562 /* if we established a new connection */
4563 cr
= (os
.conn
< C_CONNECTED
);
4564 /* if we had an established connection
4565 * and one of the nodes newly attaches a disk */
4566 cr
|= (os
.conn
== C_CONNECTED
&&
4567 (peer_state
.disk
== D_NEGOTIATING
||
4568 os
.disk
== D_NEGOTIATING
));
4569 /* if we have both been inconsistent, and the peer has been
4570 * forced to be UpToDate with --force */
4571 cr
|= test_bit(CONSIDER_RESYNC
, &device
->flags
);
4572 /* if we had been plain connected, and the admin requested to
4573 * start a sync by "invalidate" or "invalidate-remote" */
4574 cr
|= (os
.conn
== C_CONNECTED
&&
4575 (peer_state
.conn
>= C_STARTING_SYNC_S
&&
4576 peer_state
.conn
<= C_WF_BITMAP_T
));
4579 ns
.conn
= drbd_sync_handshake(peer_device
, peer_state
.role
, real_peer_disk
);
4582 if (ns
.conn
== C_MASK
) {
4583 ns
.conn
= C_CONNECTED
;
4584 if (device
->state
.disk
== D_NEGOTIATING
) {
4585 drbd_force_state(device
, NS(disk
, D_FAILED
));
4586 } else if (peer_state
.disk
== D_NEGOTIATING
) {
4587 drbd_err(device
, "Disk attach process on the peer node was aborted.\n");
4588 peer_state
.disk
= D_DISKLESS
;
4589 real_peer_disk
= D_DISKLESS
;
4591 if (test_and_clear_bit(CONN_DRY_RUN
, &peer_device
->connection
->flags
))
4593 D_ASSERT(device
, os
.conn
== C_WF_REPORT_PARAMS
);
4594 conn_request_state(peer_device
->connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
4600 spin_lock_irq(&device
->resource
->req_lock
);
4601 if (os
.i
!= drbd_read_state(device
).i
)
4603 clear_bit(CONSIDER_RESYNC
, &device
->flags
);
4604 ns
.peer
= peer_state
.role
;
4605 ns
.pdsk
= real_peer_disk
;
4606 ns
.peer_isp
= (peer_state
.aftr_isp
| peer_state
.user_isp
);
4607 if ((ns
.conn
== C_CONNECTED
|| ns
.conn
== C_WF_BITMAP_S
) && ns
.disk
== D_NEGOTIATING
)
4608 ns
.disk
= device
->new_state_tmp
.disk
;
4609 cs_flags
= CS_VERBOSE
+ (os
.conn
< C_CONNECTED
&& ns
.conn
>= C_CONNECTED
? 0 : CS_HARD
);
4610 if (ns
.pdsk
== D_CONSISTENT
&& drbd_suspended(device
) && ns
.conn
== C_CONNECTED
&& os
.conn
< C_CONNECTED
&&
4611 test_bit(NEW_CUR_UUID
, &device
->flags
)) {
4612 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4613 for temporal network outages! */
4614 spin_unlock_irq(&device
->resource
->req_lock
);
4615 drbd_err(device
, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4616 tl_clear(peer_device
->connection
);
4617 drbd_uuid_new_current(device
);
4618 clear_bit(NEW_CUR_UUID
, &device
->flags
);
4619 conn_request_state(peer_device
->connection
, NS2(conn
, C_PROTOCOL_ERROR
, susp
, 0), CS_HARD
);
4622 rv
= _drbd_set_state(device
, ns
, cs_flags
, NULL
);
4623 ns
= drbd_read_state(device
);
4624 spin_unlock_irq(&device
->resource
->req_lock
);
4626 if (rv
< SS_SUCCESS
) {
4627 conn_request_state(peer_device
->connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
4631 if (os
.conn
> C_WF_REPORT_PARAMS
) {
4632 if (ns
.conn
> C_CONNECTED
&& peer_state
.conn
<= C_CONNECTED
&&
4633 peer_state
.disk
!= D_NEGOTIATING
) {
4634 /* we want resync, peer has not yet decided to sync... */
4635 /* Nowadays only used when forcing a node into primary role and
4636 setting its disk to UpToDate with that */
4637 drbd_send_uuids(peer_device
);
4638 drbd_send_current_state(peer_device
);
4642 clear_bit(DISCARD_MY_DATA
, &device
->flags
);
4644 drbd_md_sync(device
); /* update connected indicator, la_size_sect, ... */
4649 static int receive_sync_uuid(struct drbd_connection
*connection
, struct packet_info
*pi
)
4651 struct drbd_peer_device
*peer_device
;
4652 struct drbd_device
*device
;
4653 struct p_rs_uuid
*p
= pi
->data
;
4655 peer_device
= conn_peer_device(connection
, pi
->vnr
);
4658 device
= peer_device
->device
;
4660 wait_event(device
->misc_wait
,
4661 device
->state
.conn
== C_WF_SYNC_UUID
||
4662 device
->state
.conn
== C_BEHIND
||
4663 device
->state
.conn
< C_CONNECTED
||
4664 device
->state
.disk
< D_NEGOTIATING
);
4666 /* D_ASSERT(device, device->state.conn == C_WF_SYNC_UUID ); */
4668 /* Here the _drbd_uuid_ functions are right, current should
4669 _not_ be rotated into the history */
4670 if (get_ldev_if_state(device
, D_NEGOTIATING
)) {
4671 _drbd_uuid_set(device
, UI_CURRENT
, be64_to_cpu(p
->uuid
));
4672 _drbd_uuid_set(device
, UI_BITMAP
, 0UL);
4674 drbd_print_uuids(device
, "updated sync uuid");
4675 drbd_start_resync(device
, C_SYNC_TARGET
);
4679 drbd_err(device
, "Ignoring SyncUUID packet!\n");
4685 * receive_bitmap_plain
4687 * Return 0 when done, 1 when another iteration is needed, and a negative error
4688 * code upon failure.
4691 receive_bitmap_plain(struct drbd_peer_device
*peer_device
, unsigned int size
,
4692 unsigned long *p
, struct bm_xfer_ctx
*c
)
4694 unsigned int data_size
= DRBD_SOCKET_BUFFER_SIZE
-
4695 drbd_header_size(peer_device
->connection
);
4696 unsigned int num_words
= min_t(size_t, data_size
/ sizeof(*p
),
4697 c
->bm_words
- c
->word_offset
);
4698 unsigned int want
= num_words
* sizeof(*p
);
4702 drbd_err(peer_device
, "%s:want (%u) != size (%u)\n", __func__
, want
, size
);
4707 err
= drbd_recv_all(peer_device
->connection
, p
, want
);
4711 drbd_bm_merge_lel(peer_device
->device
, c
->word_offset
, num_words
, p
);
4713 c
->word_offset
+= num_words
;
4714 c
->bit_offset
= c
->word_offset
* BITS_PER_LONG
;
4715 if (c
->bit_offset
> c
->bm_bits
)
4716 c
->bit_offset
= c
->bm_bits
;
4721 static enum drbd_bitmap_code
dcbp_get_code(struct p_compressed_bm
*p
)
4723 return (enum drbd_bitmap_code
)(p
->encoding
& 0x0f);
4726 static int dcbp_get_start(struct p_compressed_bm
*p
)
4728 return (p
->encoding
& 0x80) != 0;
4731 static int dcbp_get_pad_bits(struct p_compressed_bm
*p
)
4733 return (p
->encoding
>> 4) & 0x7;
4739 * Return 0 when done, 1 when another iteration is needed, and a negative error
4740 * code upon failure.
4743 recv_bm_rle_bits(struct drbd_peer_device
*peer_device
,
4744 struct p_compressed_bm
*p
,
4745 struct bm_xfer_ctx
*c
,
4748 struct bitstream bs
;
4752 unsigned long s
= c
->bit_offset
;
4754 int toggle
= dcbp_get_start(p
);
4758 bitstream_init(&bs
, p
->code
, len
, dcbp_get_pad_bits(p
));
4760 bits
= bitstream_get_bits(&bs
, &look_ahead
, 64);
4764 for (have
= bits
; have
> 0; s
+= rl
, toggle
= !toggle
) {
4765 bits
= vli_decode_bits(&rl
, look_ahead
);
4771 if (e
>= c
->bm_bits
) {
4772 drbd_err(peer_device
, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e
);
4775 _drbd_bm_set_bits(peer_device
->device
, s
, e
);
4779 drbd_err(peer_device
, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4780 have
, bits
, look_ahead
,
4781 (unsigned int)(bs
.cur
.b
- p
->code
),
4782 (unsigned int)bs
.buf_len
);
4785 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4786 if (likely(bits
< 64))
4787 look_ahead
>>= bits
;
4792 bits
= bitstream_get_bits(&bs
, &tmp
, 64 - have
);
4795 look_ahead
|= tmp
<< have
;
4800 bm_xfer_ctx_bit_to_word_offset(c
);
4802 return (s
!= c
->bm_bits
);
4808 * Return 0 when done, 1 when another iteration is needed, and a negative error
4809 * code upon failure.
4812 decode_bitmap_c(struct drbd_peer_device
*peer_device
,
4813 struct p_compressed_bm
*p
,
4814 struct bm_xfer_ctx
*c
,
4817 if (dcbp_get_code(p
) == RLE_VLI_Bits
)
4818 return recv_bm_rle_bits(peer_device
, p
, c
, len
- sizeof(*p
));
4820 /* other variants had been implemented for evaluation,
4821 * but have been dropped as this one turned out to be "best"
4822 * during all our tests. */
4824 drbd_err(peer_device
, "receive_bitmap_c: unknown encoding %u\n", p
->encoding
);
4825 conn_request_state(peer_device
->connection
, NS(conn
, C_PROTOCOL_ERROR
), CS_HARD
);
4829 void INFO_bm_xfer_stats(struct drbd_device
*device
,
4830 const char *direction
, struct bm_xfer_ctx
*c
)
4832 /* what would it take to transfer it "plaintext" */
4833 unsigned int header_size
= drbd_header_size(first_peer_device(device
)->connection
);
4834 unsigned int data_size
= DRBD_SOCKET_BUFFER_SIZE
- header_size
;
4835 unsigned int plain
=
4836 header_size
* (DIV_ROUND_UP(c
->bm_words
, data_size
) + 1) +
4837 c
->bm_words
* sizeof(unsigned long);
4838 unsigned int total
= c
->bytes
[0] + c
->bytes
[1];
4841 /* total can not be zero. but just in case: */
4845 /* don't report if not compressed */
4849 /* total < plain. check for overflow, still */
4850 r
= (total
> UINT_MAX
/1000) ? (total
/ (plain
/1000))
4851 : (1000 * total
/ plain
);
4857 drbd_info(device
, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4858 "total %u; compression: %u.%u%%\n",
4860 c
->bytes
[1], c
->packets
[1],
4861 c
->bytes
[0], c
->packets
[0],
4862 total
, r
/10, r
% 10);
4865 /* Since we are processing the bitfield from lower addresses to higher,
4866 it does not matter if the process it in 32 bit chunks or 64 bit
4867 chunks as long as it is little endian. (Understand it as byte stream,
4868 beginning with the lowest byte...) If we would use big endian
4869 we would need to process it from the highest address to the lowest,
4870 in order to be agnostic to the 32 vs 64 bits issue.
4872 returns 0 on failure, 1 if we successfully received it. */
4873 static int receive_bitmap(struct drbd_connection
*connection
, struct packet_info
*pi
)
4875 struct drbd_peer_device
*peer_device
;
4876 struct drbd_device
*device
;
4877 struct bm_xfer_ctx c
;
4880 peer_device
= conn_peer_device(connection
, pi
->vnr
);
4883 device
= peer_device
->device
;
4885 drbd_bm_lock(device
, "receive bitmap", BM_LOCKED_SET_ALLOWED
);
4886 /* you are supposed to send additional out-of-sync information
4887 * if you actually set bits during this phase */
4889 c
= (struct bm_xfer_ctx
) {
4890 .bm_bits
= drbd_bm_bits(device
),
4891 .bm_words
= drbd_bm_words(device
),
4895 if (pi
->cmd
== P_BITMAP
)
4896 err
= receive_bitmap_plain(peer_device
, pi
->size
, pi
->data
, &c
);
4897 else if (pi
->cmd
== P_COMPRESSED_BITMAP
) {
4898 /* MAYBE: sanity check that we speak proto >= 90,
4899 * and the feature is enabled! */
4900 struct p_compressed_bm
*p
= pi
->data
;
4902 if (pi
->size
> DRBD_SOCKET_BUFFER_SIZE
- drbd_header_size(connection
)) {
4903 drbd_err(device
, "ReportCBitmap packet too large\n");
4907 if (pi
->size
<= sizeof(*p
)) {
4908 drbd_err(device
, "ReportCBitmap packet too small (l:%u)\n", pi
->size
);
4912 err
= drbd_recv_all(peer_device
->connection
, p
, pi
->size
);
4915 err
= decode_bitmap_c(peer_device
, p
, &c
, pi
->size
);
4917 drbd_warn(device
, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi
->cmd
);
4922 c
.packets
[pi
->cmd
== P_BITMAP
]++;
4923 c
.bytes
[pi
->cmd
== P_BITMAP
] += drbd_header_size(connection
) + pi
->size
;
4930 err
= drbd_recv_header(peer_device
->connection
, pi
);
4935 INFO_bm_xfer_stats(device
, "receive", &c
);
4937 if (device
->state
.conn
== C_WF_BITMAP_T
) {
4938 enum drbd_state_rv rv
;
4940 err
= drbd_send_bitmap(device
);
4943 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4944 rv
= _drbd_request_state(device
, NS(conn
, C_WF_SYNC_UUID
), CS_VERBOSE
);
4945 D_ASSERT(device
, rv
== SS_SUCCESS
);
4946 } else if (device
->state
.conn
!= C_WF_BITMAP_S
) {
4947 /* admin may have requested C_DISCONNECTING,
4948 * other threads may have noticed network errors */
4949 drbd_info(device
, "unexpected cstate (%s) in receive_bitmap\n",
4950 drbd_conn_str(device
->state
.conn
));
4955 drbd_bm_unlock(device
);
4956 if (!err
&& device
->state
.conn
== C_WF_BITMAP_S
)
4957 drbd_start_resync(device
, C_SYNC_SOURCE
);
4961 static int receive_skip(struct drbd_connection
*connection
, struct packet_info
*pi
)
4963 drbd_warn(connection
, "skipping unknown optional packet type %d, l: %d!\n",
4966 return ignore_remaining_packet(connection
, pi
);
4969 static int receive_UnplugRemote(struct drbd_connection
*connection
, struct packet_info
*pi
)
4971 /* Make sure we've acked all the TCP data associated
4972 * with the data requests being unplugged */
4973 drbd_tcp_quickack(connection
->data
.socket
);
4978 static int receive_out_of_sync(struct drbd_connection
*connection
, struct packet_info
*pi
)
4980 struct drbd_peer_device
*peer_device
;
4981 struct drbd_device
*device
;
4982 struct p_block_desc
*p
= pi
->data
;
4984 peer_device
= conn_peer_device(connection
, pi
->vnr
);
4987 device
= peer_device
->device
;
4989 switch (device
->state
.conn
) {
4990 case C_WF_SYNC_UUID
:
4995 drbd_err(device
, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4996 drbd_conn_str(device
->state
.conn
));
4999 drbd_set_out_of_sync(device
, be64_to_cpu(p
->sector
), be32_to_cpu(p
->blksize
));
5004 static int receive_rs_deallocated(struct drbd_connection
*connection
, struct packet_info
*pi
)
5006 struct drbd_peer_device
*peer_device
;
5007 struct p_block_desc
*p
= pi
->data
;
5008 struct drbd_device
*device
;
5012 peer_device
= conn_peer_device(connection
, pi
->vnr
);
5015 device
= peer_device
->device
;
5017 sector
= be64_to_cpu(p
->sector
);
5018 size
= be32_to_cpu(p
->blksize
);
5020 dec_rs_pending(device
);
5022 if (get_ldev(device
)) {
5023 struct drbd_peer_request
*peer_req
;
5024 const int op
= REQ_OP_WRITE_ZEROES
;
5026 peer_req
= drbd_alloc_peer_req(peer_device
, ID_SYNCER
, sector
,
5033 peer_req
->w
.cb
= e_end_resync_block
;
5034 peer_req
->submit_jif
= jiffies
;
5035 peer_req
->flags
|= EE_TRIM
;
5037 spin_lock_irq(&device
->resource
->req_lock
);
5038 list_add_tail(&peer_req
->w
.list
, &device
->sync_ee
);
5039 spin_unlock_irq(&device
->resource
->req_lock
);
5041 atomic_add(pi
->size
>> 9, &device
->rs_sect_ev
);
5042 err
= drbd_submit_peer_request(device
, peer_req
, op
, 0, DRBD_FAULT_RS_WR
);
5045 spin_lock_irq(&device
->resource
->req_lock
);
5046 list_del(&peer_req
->w
.list
);
5047 spin_unlock_irq(&device
->resource
->req_lock
);
5049 drbd_free_peer_req(device
, peer_req
);
5055 inc_unacked(device
);
5057 /* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
5058 as well as drbd_rs_complete_io() */
5061 drbd_rs_complete_io(device
, sector
);
5062 drbd_send_ack_ex(peer_device
, P_NEG_ACK
, sector
, size
, ID_SYNCER
);
5065 atomic_add(size
>> 9, &device
->rs_sect_in
);
5072 unsigned int pkt_size
;
5073 int (*fn
)(struct drbd_connection
*, struct packet_info
*);
5076 static struct data_cmd drbd_cmd_handler
[] = {
5077 [P_DATA
] = { 1, sizeof(struct p_data
), receive_Data
},
5078 [P_DATA_REPLY
] = { 1, sizeof(struct p_data
), receive_DataReply
},
5079 [P_RS_DATA_REPLY
] = { 1, sizeof(struct p_data
), receive_RSDataReply
} ,
5080 [P_BARRIER
] = { 0, sizeof(struct p_barrier
), receive_Barrier
} ,
5081 [P_BITMAP
] = { 1, 0, receive_bitmap
} ,
5082 [P_COMPRESSED_BITMAP
] = { 1, 0, receive_bitmap
} ,
5083 [P_UNPLUG_REMOTE
] = { 0, 0, receive_UnplugRemote
},
5084 [P_DATA_REQUEST
] = { 0, sizeof(struct p_block_req
), receive_DataRequest
},
5085 [P_RS_DATA_REQUEST
] = { 0, sizeof(struct p_block_req
), receive_DataRequest
},
5086 [P_SYNC_PARAM
] = { 1, 0, receive_SyncParam
},
5087 [P_SYNC_PARAM89
] = { 1, 0, receive_SyncParam
},
5088 [P_PROTOCOL
] = { 1, sizeof(struct p_protocol
), receive_protocol
},
5089 [P_UUIDS
] = { 0, sizeof(struct p_uuids
), receive_uuids
},
5090 [P_SIZES
] = { 0, sizeof(struct p_sizes
), receive_sizes
},
5091 [P_STATE
] = { 0, sizeof(struct p_state
), receive_state
},
5092 [P_STATE_CHG_REQ
] = { 0, sizeof(struct p_req_state
), receive_req_state
},
5093 [P_SYNC_UUID
] = { 0, sizeof(struct p_rs_uuid
), receive_sync_uuid
},
5094 [P_OV_REQUEST
] = { 0, sizeof(struct p_block_req
), receive_DataRequest
},
5095 [P_OV_REPLY
] = { 1, sizeof(struct p_block_req
), receive_DataRequest
},
5096 [P_CSUM_RS_REQUEST
] = { 1, sizeof(struct p_block_req
), receive_DataRequest
},
5097 [P_RS_THIN_REQ
] = { 0, sizeof(struct p_block_req
), receive_DataRequest
},
5098 [P_DELAY_PROBE
] = { 0, sizeof(struct p_delay_probe93
), receive_skip
},
5099 [P_OUT_OF_SYNC
] = { 0, sizeof(struct p_block_desc
), receive_out_of_sync
},
5100 [P_CONN_ST_CHG_REQ
] = { 0, sizeof(struct p_req_state
), receive_req_conn_state
},
5101 [P_PROTOCOL_UPDATE
] = { 1, sizeof(struct p_protocol
), receive_protocol
},
5102 [P_TRIM
] = { 0, sizeof(struct p_trim
), receive_Data
},
5103 [P_ZEROES
] = { 0, sizeof(struct p_trim
), receive_Data
},
5104 [P_RS_DEALLOCATED
] = { 0, sizeof(struct p_block_desc
), receive_rs_deallocated
},
5105 [P_WSAME
] = { 1, sizeof(struct p_wsame
), receive_Data
},
5108 static void drbdd(struct drbd_connection
*connection
)
5110 struct packet_info pi
;
5111 size_t shs
; /* sub header size */
5114 while (get_t_state(&connection
->receiver
) == RUNNING
) {
5115 struct data_cmd
const *cmd
;
5117 drbd_thread_current_set_cpu(&connection
->receiver
);
5118 update_receiver_timing_details(connection
, drbd_recv_header_maybe_unplug
);
5119 if (drbd_recv_header_maybe_unplug(connection
, &pi
))
5122 cmd
= &drbd_cmd_handler
[pi
.cmd
];
5123 if (unlikely(pi
.cmd
>= ARRAY_SIZE(drbd_cmd_handler
) || !cmd
->fn
)) {
5124 drbd_err(connection
, "Unexpected data packet %s (0x%04x)",
5125 cmdname(pi
.cmd
), pi
.cmd
);
5129 shs
= cmd
->pkt_size
;
5130 if (pi
.cmd
== P_SIZES
&& connection
->agreed_features
& DRBD_FF_WSAME
)
5131 shs
+= sizeof(struct o_qlim
);
5132 if (pi
.size
> shs
&& !cmd
->expect_payload
) {
5133 drbd_err(connection
, "No payload expected %s l:%d\n",
5134 cmdname(pi
.cmd
), pi
.size
);
5137 if (pi
.size
< shs
) {
5138 drbd_err(connection
, "%s: unexpected packet size, expected:%d received:%d\n",
5139 cmdname(pi
.cmd
), (int)shs
, pi
.size
);
5144 update_receiver_timing_details(connection
, drbd_recv_all_warn
);
5145 err
= drbd_recv_all_warn(connection
, pi
.data
, shs
);
5151 update_receiver_timing_details(connection
, cmd
->fn
);
5152 err
= cmd
->fn(connection
, &pi
);
5154 drbd_err(connection
, "error receiving %s, e: %d l: %d!\n",
5155 cmdname(pi
.cmd
), err
, pi
.size
);
5162 conn_request_state(connection
, NS(conn
, C_PROTOCOL_ERROR
), CS_HARD
);
5165 static void conn_disconnect(struct drbd_connection
*connection
)
5167 struct drbd_peer_device
*peer_device
;
5171 if (connection
->cstate
== C_STANDALONE
)
5174 /* We are about to start the cleanup after connection loss.
5175 * Make sure drbd_make_request knows about that.
5176 * Usually we should be in some network failure state already,
5177 * but just in case we are not, we fix it up here.
5179 conn_request_state(connection
, NS(conn
, C_NETWORK_FAILURE
), CS_HARD
);
5181 /* ack_receiver does not clean up anything. it must not interfere, either */
5182 drbd_thread_stop(&connection
->ack_receiver
);
5183 if (connection
->ack_sender
) {
5184 destroy_workqueue(connection
->ack_sender
);
5185 connection
->ack_sender
= NULL
;
5187 drbd_free_sock(connection
);
5190 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
) {
5191 struct drbd_device
*device
= peer_device
->device
;
5192 kref_get(&device
->kref
);
5194 drbd_disconnected(peer_device
);
5195 kref_put(&device
->kref
, drbd_destroy_device
);
5200 if (!list_empty(&connection
->current_epoch
->list
))
5201 drbd_err(connection
, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
5202 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
5203 atomic_set(&connection
->current_epoch
->epoch_size
, 0);
5204 connection
->send
.seen_any_write_yet
= false;
5206 drbd_info(connection
, "Connection closed\n");
5208 if (conn_highest_role(connection
) == R_PRIMARY
&& conn_highest_pdsk(connection
) >= D_UNKNOWN
)
5209 conn_try_outdate_peer_async(connection
);
5211 spin_lock_irq(&connection
->resource
->req_lock
);
5212 oc
= connection
->cstate
;
5213 if (oc
>= C_UNCONNECTED
)
5214 _conn_request_state(connection
, NS(conn
, C_UNCONNECTED
), CS_VERBOSE
);
5216 spin_unlock_irq(&connection
->resource
->req_lock
);
5218 if (oc
== C_DISCONNECTING
)
5219 conn_request_state(connection
, NS(conn
, C_STANDALONE
), CS_VERBOSE
| CS_HARD
);
5222 static int drbd_disconnected(struct drbd_peer_device
*peer_device
)
5224 struct drbd_device
*device
= peer_device
->device
;
5227 /* wait for current activity to cease. */
5228 spin_lock_irq(&device
->resource
->req_lock
);
5229 _drbd_wait_ee_list_empty(device
, &device
->active_ee
);
5230 _drbd_wait_ee_list_empty(device
, &device
->sync_ee
);
5231 _drbd_wait_ee_list_empty(device
, &device
->read_ee
);
5232 spin_unlock_irq(&device
->resource
->req_lock
);
5234 /* We do not have data structures that would allow us to
5235 * get the rs_pending_cnt down to 0 again.
5236 * * On C_SYNC_TARGET we do not have any data structures describing
5237 * the pending RSDataRequest's we have sent.
5238 * * On C_SYNC_SOURCE there is no data structure that tracks
5239 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5240 * And no, it is not the sum of the reference counts in the
5241 * resync_LRU. The resync_LRU tracks the whole operation including
5242 * the disk-IO, while the rs_pending_cnt only tracks the blocks
5244 drbd_rs_cancel_all(device
);
5245 device
->rs_total
= 0;
5246 device
->rs_failed
= 0;
5247 atomic_set(&device
->rs_pending_cnt
, 0);
5248 wake_up(&device
->misc_wait
);
5250 del_timer_sync(&device
->resync_timer
);
5251 resync_timer_fn(&device
->resync_timer
);
5253 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5254 * w_make_resync_request etc. which may still be on the worker queue
5255 * to be "canceled" */
5256 drbd_flush_workqueue(&peer_device
->connection
->sender_work
);
5258 drbd_finish_peer_reqs(device
);
5260 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5261 might have issued a work again. The one before drbd_finish_peer_reqs() is
5262 necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5263 drbd_flush_workqueue(&peer_device
->connection
->sender_work
);
5265 /* need to do it again, drbd_finish_peer_reqs() may have populated it
5266 * again via drbd_try_clear_on_disk_bm(). */
5267 drbd_rs_cancel_all(device
);
5269 kfree(device
->p_uuid
);
5270 device
->p_uuid
= NULL
;
5272 if (!drbd_suspended(device
))
5273 tl_clear(peer_device
->connection
);
5275 drbd_md_sync(device
);
5277 if (get_ldev(device
)) {
5278 drbd_bitmap_io(device
, &drbd_bm_write_copy_pages
,
5279 "write from disconnected", BM_LOCKED_CHANGE_ALLOWED
);
5283 /* tcp_close and release of sendpage pages can be deferred. I don't
5284 * want to use SO_LINGER, because apparently it can be deferred for
5285 * more than 20 seconds (longest time I checked).
5287 * Actually we don't care for exactly when the network stack does its
5288 * put_page(), but release our reference on these pages right here.
5290 i
= drbd_free_peer_reqs(device
, &device
->net_ee
);
5292 drbd_info(device
, "net_ee not empty, killed %u entries\n", i
);
5293 i
= atomic_read(&device
->pp_in_use_by_net
);
5295 drbd_info(device
, "pp_in_use_by_net = %d, expected 0\n", i
);
5296 i
= atomic_read(&device
->pp_in_use
);
5298 drbd_info(device
, "pp_in_use = %d, expected 0\n", i
);
5300 D_ASSERT(device
, list_empty(&device
->read_ee
));
5301 D_ASSERT(device
, list_empty(&device
->active_ee
));
5302 D_ASSERT(device
, list_empty(&device
->sync_ee
));
5303 D_ASSERT(device
, list_empty(&device
->done_ee
));
5309 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5310 * we can agree on is stored in agreed_pro_version.
5312 * feature flags and the reserved array should be enough room for future
5313 * enhancements of the handshake protocol, and possible plugins...
5315 * for now, they are expected to be zero, but ignored.
5317 static int drbd_send_features(struct drbd_connection
*connection
)
5319 struct drbd_socket
*sock
;
5320 struct p_connection_features
*p
;
5322 sock
= &connection
->data
;
5323 p
= conn_prepare_command(connection
, sock
);
5326 memset(p
, 0, sizeof(*p
));
5327 p
->protocol_min
= cpu_to_be32(PRO_VERSION_MIN
);
5328 p
->protocol_max
= cpu_to_be32(PRO_VERSION_MAX
);
5329 p
->feature_flags
= cpu_to_be32(PRO_FEATURES
);
5330 return conn_send_command(connection
, sock
, P_CONNECTION_FEATURES
, sizeof(*p
), NULL
, 0);
5335 * 1 yes, we have a valid connection
5336 * 0 oops, did not work out, please try again
5337 * -1 peer talks different language,
5338 * no point in trying again, please go standalone.
5340 static int drbd_do_features(struct drbd_connection
*connection
)
5342 /* ASSERT current == connection->receiver ... */
5343 struct p_connection_features
*p
;
5344 const int expect
= sizeof(struct p_connection_features
);
5345 struct packet_info pi
;
5348 err
= drbd_send_features(connection
);
5352 err
= drbd_recv_header(connection
, &pi
);
5356 if (pi
.cmd
!= P_CONNECTION_FEATURES
) {
5357 drbd_err(connection
, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5358 cmdname(pi
.cmd
), pi
.cmd
);
5362 if (pi
.size
!= expect
) {
5363 drbd_err(connection
, "expected ConnectionFeatures length: %u, received: %u\n",
5369 err
= drbd_recv_all_warn(connection
, p
, expect
);
5373 p
->protocol_min
= be32_to_cpu(p
->protocol_min
);
5374 p
->protocol_max
= be32_to_cpu(p
->protocol_max
);
5375 if (p
->protocol_max
== 0)
5376 p
->protocol_max
= p
->protocol_min
;
5378 if (PRO_VERSION_MAX
< p
->protocol_min
||
5379 PRO_VERSION_MIN
> p
->protocol_max
)
5382 connection
->agreed_pro_version
= min_t(int, PRO_VERSION_MAX
, p
->protocol_max
);
5383 connection
->agreed_features
= PRO_FEATURES
& be32_to_cpu(p
->feature_flags
);
5385 drbd_info(connection
, "Handshake successful: "
5386 "Agreed network protocol version %d\n", connection
->agreed_pro_version
);
5388 drbd_info(connection
, "Feature flags enabled on protocol level: 0x%x%s%s%s%s.\n",
5389 connection
->agreed_features
,
5390 connection
->agreed_features
& DRBD_FF_TRIM
? " TRIM" : "",
5391 connection
->agreed_features
& DRBD_FF_THIN_RESYNC
? " THIN_RESYNC" : "",
5392 connection
->agreed_features
& DRBD_FF_WSAME
? " WRITE_SAME" : "",
5393 connection
->agreed_features
& DRBD_FF_WZEROES
? " WRITE_ZEROES" :
5394 connection
->agreed_features
? "" : " none");
5399 drbd_err(connection
, "incompatible DRBD dialects: "
5400 "I support %d-%d, peer supports %d-%d\n",
5401 PRO_VERSION_MIN
, PRO_VERSION_MAX
,
5402 p
->protocol_min
, p
->protocol_max
);
5406 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5407 static int drbd_do_auth(struct drbd_connection
*connection
)
5409 drbd_err(connection
, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5410 drbd_err(connection
, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5414 #define CHALLENGE_LEN 64
5418 0 - failed, try again (network error),
5419 -1 - auth failed, don't try again.
5422 static int drbd_do_auth(struct drbd_connection
*connection
)
5424 struct drbd_socket
*sock
;
5425 char my_challenge
[CHALLENGE_LEN
]; /* 64 Bytes... */
5426 char *response
= NULL
;
5427 char *right_response
= NULL
;
5428 char *peers_ch
= NULL
;
5429 unsigned int key_len
;
5430 char secret
[SHARED_SECRET_MAX
]; /* 64 byte */
5431 unsigned int resp_size
;
5432 SHASH_DESC_ON_STACK(desc
, connection
->cram_hmac_tfm
);
5433 struct packet_info pi
;
5434 struct net_conf
*nc
;
5437 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
5440 nc
= rcu_dereference(connection
->net_conf
);
5441 key_len
= strlen(nc
->shared_secret
);
5442 memcpy(secret
, nc
->shared_secret
, key_len
);
5445 desc
->tfm
= connection
->cram_hmac_tfm
;
5448 rv
= crypto_shash_setkey(connection
->cram_hmac_tfm
, (u8
*)secret
, key_len
);
5450 drbd_err(connection
, "crypto_shash_setkey() failed with %d\n", rv
);
5455 get_random_bytes(my_challenge
, CHALLENGE_LEN
);
5457 sock
= &connection
->data
;
5458 if (!conn_prepare_command(connection
, sock
)) {
5462 rv
= !conn_send_command(connection
, sock
, P_AUTH_CHALLENGE
, 0,
5463 my_challenge
, CHALLENGE_LEN
);
5467 err
= drbd_recv_header(connection
, &pi
);
5473 if (pi
.cmd
!= P_AUTH_CHALLENGE
) {
5474 drbd_err(connection
, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5475 cmdname(pi
.cmd
), pi
.cmd
);
5480 if (pi
.size
> CHALLENGE_LEN
* 2) {
5481 drbd_err(connection
, "expected AuthChallenge payload too big.\n");
5486 if (pi
.size
< CHALLENGE_LEN
) {
5487 drbd_err(connection
, "AuthChallenge payload too small.\n");
5492 peers_ch
= kmalloc(pi
.size
, GFP_NOIO
);
5493 if (peers_ch
== NULL
) {
5494 drbd_err(connection
, "kmalloc of peers_ch failed\n");
5499 err
= drbd_recv_all_warn(connection
, peers_ch
, pi
.size
);
5505 if (!memcmp(my_challenge
, peers_ch
, CHALLENGE_LEN
)) {
5506 drbd_err(connection
, "Peer presented the same challenge!\n");
5511 resp_size
= crypto_shash_digestsize(connection
->cram_hmac_tfm
);
5512 response
= kmalloc(resp_size
, GFP_NOIO
);
5513 if (response
== NULL
) {
5514 drbd_err(connection
, "kmalloc of response failed\n");
5519 rv
= crypto_shash_digest(desc
, peers_ch
, pi
.size
, response
);
5521 drbd_err(connection
, "crypto_hash_digest() failed with %d\n", rv
);
5526 if (!conn_prepare_command(connection
, sock
)) {
5530 rv
= !conn_send_command(connection
, sock
, P_AUTH_RESPONSE
, 0,
5531 response
, resp_size
);
5535 err
= drbd_recv_header(connection
, &pi
);
5541 if (pi
.cmd
!= P_AUTH_RESPONSE
) {
5542 drbd_err(connection
, "expected AuthResponse packet, received: %s (0x%04x)\n",
5543 cmdname(pi
.cmd
), pi
.cmd
);
5548 if (pi
.size
!= resp_size
) {
5549 drbd_err(connection
, "expected AuthResponse payload of wrong size\n");
5554 err
= drbd_recv_all_warn(connection
, response
, resp_size
);
5560 right_response
= kmalloc(resp_size
, GFP_NOIO
);
5561 if (right_response
== NULL
) {
5562 drbd_err(connection
, "kmalloc of right_response failed\n");
5567 rv
= crypto_shash_digest(desc
, my_challenge
, CHALLENGE_LEN
,
5570 drbd_err(connection
, "crypto_hash_digest() failed with %d\n", rv
);
5575 rv
= !memcmp(response
, right_response
, resp_size
);
5578 drbd_info(connection
, "Peer authenticated using %d bytes HMAC\n",
5586 kfree(right_response
);
5587 shash_desc_zero(desc
);
5593 int drbd_receiver(struct drbd_thread
*thi
)
5595 struct drbd_connection
*connection
= thi
->connection
;
5598 drbd_info(connection
, "receiver (re)started\n");
5601 h
= conn_connect(connection
);
5603 conn_disconnect(connection
);
5604 schedule_timeout_interruptible(HZ
);
5607 drbd_warn(connection
, "Discarding network configuration.\n");
5608 conn_request_state(connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
5613 blk_start_plug(&connection
->receiver_plug
);
5615 blk_finish_plug(&connection
->receiver_plug
);
5618 conn_disconnect(connection
);
5620 drbd_info(connection
, "receiver terminated\n");
5624 /* ********* acknowledge sender ******** */
5626 static int got_conn_RqSReply(struct drbd_connection
*connection
, struct packet_info
*pi
)
5628 struct p_req_state_reply
*p
= pi
->data
;
5629 int retcode
= be32_to_cpu(p
->retcode
);
5631 if (retcode
>= SS_SUCCESS
) {
5632 set_bit(CONN_WD_ST_CHG_OKAY
, &connection
->flags
);
5634 set_bit(CONN_WD_ST_CHG_FAIL
, &connection
->flags
);
5635 drbd_err(connection
, "Requested state change failed by peer: %s (%d)\n",
5636 drbd_set_st_err_str(retcode
), retcode
);
5638 wake_up(&connection
->ping_wait
);
5643 static int got_RqSReply(struct drbd_connection
*connection
, struct packet_info
*pi
)
5645 struct drbd_peer_device
*peer_device
;
5646 struct drbd_device
*device
;
5647 struct p_req_state_reply
*p
= pi
->data
;
5648 int retcode
= be32_to_cpu(p
->retcode
);
5650 peer_device
= conn_peer_device(connection
, pi
->vnr
);
5653 device
= peer_device
->device
;
5655 if (test_bit(CONN_WD_ST_CHG_REQ
, &connection
->flags
)) {
5656 D_ASSERT(device
, connection
->agreed_pro_version
< 100);
5657 return got_conn_RqSReply(connection
, pi
);
5660 if (retcode
>= SS_SUCCESS
) {
5661 set_bit(CL_ST_CHG_SUCCESS
, &device
->flags
);
5663 set_bit(CL_ST_CHG_FAIL
, &device
->flags
);
5664 drbd_err(device
, "Requested state change failed by peer: %s (%d)\n",
5665 drbd_set_st_err_str(retcode
), retcode
);
5667 wake_up(&device
->state_wait
);
5672 static int got_Ping(struct drbd_connection
*connection
, struct packet_info
*pi
)
5674 return drbd_send_ping_ack(connection
);
5678 static int got_PingAck(struct drbd_connection
*connection
, struct packet_info
*pi
)
5680 /* restore idle timeout */
5681 connection
->meta
.socket
->sk
->sk_rcvtimeo
= connection
->net_conf
->ping_int
*HZ
;
5682 if (!test_and_set_bit(GOT_PING_ACK
, &connection
->flags
))
5683 wake_up(&connection
->ping_wait
);
5688 static int got_IsInSync(struct drbd_connection
*connection
, struct packet_info
*pi
)
5690 struct drbd_peer_device
*peer_device
;
5691 struct drbd_device
*device
;
5692 struct p_block_ack
*p
= pi
->data
;
5693 sector_t sector
= be64_to_cpu(p
->sector
);
5694 int blksize
= be32_to_cpu(p
->blksize
);
5696 peer_device
= conn_peer_device(connection
, pi
->vnr
);
5699 device
= peer_device
->device
;
5701 D_ASSERT(device
, peer_device
->connection
->agreed_pro_version
>= 89);
5703 update_peer_seq(peer_device
, be32_to_cpu(p
->seq_num
));
5705 if (get_ldev(device
)) {
5706 drbd_rs_complete_io(device
, sector
);
5707 drbd_set_in_sync(device
, sector
, blksize
);
5708 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5709 device
->rs_same_csum
+= (blksize
>> BM_BLOCK_SHIFT
);
5712 dec_rs_pending(device
);
5713 atomic_add(blksize
>> 9, &device
->rs_sect_in
);
5719 validate_req_change_req_state(struct drbd_device
*device
, u64 id
, sector_t sector
,
5720 struct rb_root
*root
, const char *func
,
5721 enum drbd_req_event what
, bool missing_ok
)
5723 struct drbd_request
*req
;
5724 struct bio_and_error m
;
5726 spin_lock_irq(&device
->resource
->req_lock
);
5727 req
= find_request(device
, root
, id
, sector
, missing_ok
, func
);
5728 if (unlikely(!req
)) {
5729 spin_unlock_irq(&device
->resource
->req_lock
);
5732 __req_mod(req
, what
, &m
);
5733 spin_unlock_irq(&device
->resource
->req_lock
);
5736 complete_master_bio(device
, &m
);
5740 static int got_BlockAck(struct drbd_connection
*connection
, struct packet_info
*pi
)
5742 struct drbd_peer_device
*peer_device
;
5743 struct drbd_device
*device
;
5744 struct p_block_ack
*p
= pi
->data
;
5745 sector_t sector
= be64_to_cpu(p
->sector
);
5746 int blksize
= be32_to_cpu(p
->blksize
);
5747 enum drbd_req_event what
;
5749 peer_device
= conn_peer_device(connection
, pi
->vnr
);
5752 device
= peer_device
->device
;
5754 update_peer_seq(peer_device
, be32_to_cpu(p
->seq_num
));
5756 if (p
->block_id
== ID_SYNCER
) {
5757 drbd_set_in_sync(device
, sector
, blksize
);
5758 dec_rs_pending(device
);
5762 case P_RS_WRITE_ACK
:
5763 what
= WRITE_ACKED_BY_PEER_AND_SIS
;
5766 what
= WRITE_ACKED_BY_PEER
;
5769 what
= RECV_ACKED_BY_PEER
;
5772 what
= CONFLICT_RESOLVED
;
5775 what
= POSTPONE_WRITE
;
5781 return validate_req_change_req_state(device
, p
->block_id
, sector
,
5782 &device
->write_requests
, __func__
,
5786 static int got_NegAck(struct drbd_connection
*connection
, struct packet_info
*pi
)
5788 struct drbd_peer_device
*peer_device
;
5789 struct drbd_device
*device
;
5790 struct p_block_ack
*p
= pi
->data
;
5791 sector_t sector
= be64_to_cpu(p
->sector
);
5792 int size
= be32_to_cpu(p
->blksize
);
5795 peer_device
= conn_peer_device(connection
, pi
->vnr
);
5798 device
= peer_device
->device
;
5800 update_peer_seq(peer_device
, be32_to_cpu(p
->seq_num
));
5802 if (p
->block_id
== ID_SYNCER
) {
5803 dec_rs_pending(device
);
5804 drbd_rs_failed_io(device
, sector
, size
);
5808 err
= validate_req_change_req_state(device
, p
->block_id
, sector
,
5809 &device
->write_requests
, __func__
,
5812 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5813 The master bio might already be completed, therefore the
5814 request is no longer in the collision hash. */
5815 /* In Protocol B we might already have got a P_RECV_ACK
5816 but then get a P_NEG_ACK afterwards. */
5817 drbd_set_out_of_sync(device
, sector
, size
);
5822 static int got_NegDReply(struct drbd_connection
*connection
, struct packet_info
*pi
)
5824 struct drbd_peer_device
*peer_device
;
5825 struct drbd_device
*device
;
5826 struct p_block_ack
*p
= pi
->data
;
5827 sector_t sector
= be64_to_cpu(p
->sector
);
5829 peer_device
= conn_peer_device(connection
, pi
->vnr
);
5832 device
= peer_device
->device
;
5834 update_peer_seq(peer_device
, be32_to_cpu(p
->seq_num
));
5836 drbd_err(device
, "Got NegDReply; Sector %llus, len %u.\n",
5837 (unsigned long long)sector
, be32_to_cpu(p
->blksize
));
5839 return validate_req_change_req_state(device
, p
->block_id
, sector
,
5840 &device
->read_requests
, __func__
,
5844 static int got_NegRSDReply(struct drbd_connection
*connection
, struct packet_info
*pi
)
5846 struct drbd_peer_device
*peer_device
;
5847 struct drbd_device
*device
;
5850 struct p_block_ack
*p
= pi
->data
;
5852 peer_device
= conn_peer_device(connection
, pi
->vnr
);
5855 device
= peer_device
->device
;
5857 sector
= be64_to_cpu(p
->sector
);
5858 size
= be32_to_cpu(p
->blksize
);
5860 update_peer_seq(peer_device
, be32_to_cpu(p
->seq_num
));
5862 dec_rs_pending(device
);
5864 if (get_ldev_if_state(device
, D_FAILED
)) {
5865 drbd_rs_complete_io(device
, sector
);
5867 case P_NEG_RS_DREPLY
:
5868 drbd_rs_failed_io(device
, sector
, size
);
5880 static int got_BarrierAck(struct drbd_connection
*connection
, struct packet_info
*pi
)
5882 struct p_barrier_ack
*p
= pi
->data
;
5883 struct drbd_peer_device
*peer_device
;
5886 tl_release(connection
, p
->barrier
, be32_to_cpu(p
->set_size
));
5889 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
) {
5890 struct drbd_device
*device
= peer_device
->device
;
5892 if (device
->state
.conn
== C_AHEAD
&&
5893 atomic_read(&device
->ap_in_flight
) == 0 &&
5894 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE
, &device
->flags
)) {
5895 device
->start_resync_timer
.expires
= jiffies
+ HZ
;
5896 add_timer(&device
->start_resync_timer
);
5904 static int got_OVResult(struct drbd_connection
*connection
, struct packet_info
*pi
)
5906 struct drbd_peer_device
*peer_device
;
5907 struct drbd_device
*device
;
5908 struct p_block_ack
*p
= pi
->data
;
5909 struct drbd_device_work
*dw
;
5913 peer_device
= conn_peer_device(connection
, pi
->vnr
);
5916 device
= peer_device
->device
;
5918 sector
= be64_to_cpu(p
->sector
);
5919 size
= be32_to_cpu(p
->blksize
);
5921 update_peer_seq(peer_device
, be32_to_cpu(p
->seq_num
));
5923 if (be64_to_cpu(p
->block_id
) == ID_OUT_OF_SYNC
)
5924 drbd_ov_out_of_sync_found(device
, sector
, size
);
5926 ov_out_of_sync_print(device
);
5928 if (!get_ldev(device
))
5931 drbd_rs_complete_io(device
, sector
);
5932 dec_rs_pending(device
);
5936 /* let's advance progress step marks only for every other megabyte */
5937 if ((device
->ov_left
& 0x200) == 0x200)
5938 drbd_advance_rs_marks(device
, device
->ov_left
);
5940 if (device
->ov_left
== 0) {
5941 dw
= kmalloc(sizeof(*dw
), GFP_NOIO
);
5943 dw
->w
.cb
= w_ov_finished
;
5944 dw
->device
= device
;
5945 drbd_queue_work(&peer_device
->connection
->sender_work
, &dw
->w
);
5947 drbd_err(device
, "kmalloc(dw) failed.");
5948 ov_out_of_sync_print(device
);
5949 drbd_resync_finished(device
);
5956 static int got_skip(struct drbd_connection
*connection
, struct packet_info
*pi
)
5961 struct meta_sock_cmd
{
5963 int (*fn
)(struct drbd_connection
*connection
, struct packet_info
*);
5966 static void set_rcvtimeo(struct drbd_connection
*connection
, bool ping_timeout
)
5969 struct net_conf
*nc
;
5972 nc
= rcu_dereference(connection
->net_conf
);
5973 t
= ping_timeout
? nc
->ping_timeo
: nc
->ping_int
;
5980 connection
->meta
.socket
->sk
->sk_rcvtimeo
= t
;
5983 static void set_ping_timeout(struct drbd_connection
*connection
)
5985 set_rcvtimeo(connection
, 1);
5988 static void set_idle_timeout(struct drbd_connection
*connection
)
5990 set_rcvtimeo(connection
, 0);
5993 static struct meta_sock_cmd ack_receiver_tbl
[] = {
5994 [P_PING
] = { 0, got_Ping
},
5995 [P_PING_ACK
] = { 0, got_PingAck
},
5996 [P_RECV_ACK
] = { sizeof(struct p_block_ack
), got_BlockAck
},
5997 [P_WRITE_ACK
] = { sizeof(struct p_block_ack
), got_BlockAck
},
5998 [P_RS_WRITE_ACK
] = { sizeof(struct p_block_ack
), got_BlockAck
},
5999 [P_SUPERSEDED
] = { sizeof(struct p_block_ack
), got_BlockAck
},
6000 [P_NEG_ACK
] = { sizeof(struct p_block_ack
), got_NegAck
},
6001 [P_NEG_DREPLY
] = { sizeof(struct p_block_ack
), got_NegDReply
},
6002 [P_NEG_RS_DREPLY
] = { sizeof(struct p_block_ack
), got_NegRSDReply
},
6003 [P_OV_RESULT
] = { sizeof(struct p_block_ack
), got_OVResult
},
6004 [P_BARRIER_ACK
] = { sizeof(struct p_barrier_ack
), got_BarrierAck
},
6005 [P_STATE_CHG_REPLY
] = { sizeof(struct p_req_state_reply
), got_RqSReply
},
6006 [P_RS_IS_IN_SYNC
] = { sizeof(struct p_block_ack
), got_IsInSync
},
6007 [P_DELAY_PROBE
] = { sizeof(struct p_delay_probe93
), got_skip
},
6008 [P_RS_CANCEL
] = { sizeof(struct p_block_ack
), got_NegRSDReply
},
6009 [P_CONN_ST_CHG_REPLY
]={ sizeof(struct p_req_state_reply
), got_conn_RqSReply
},
6010 [P_RETRY_WRITE
] = { sizeof(struct p_block_ack
), got_BlockAck
},
6013 int drbd_ack_receiver(struct drbd_thread
*thi
)
6015 struct drbd_connection
*connection
= thi
->connection
;
6016 struct meta_sock_cmd
*cmd
= NULL
;
6017 struct packet_info pi
;
6018 unsigned long pre_recv_jif
;
6020 void *buf
= connection
->meta
.rbuf
;
6022 unsigned int header_size
= drbd_header_size(connection
);
6023 int expect
= header_size
;
6024 bool ping_timeout_active
= false;
6025 struct sched_param param
= { .sched_priority
= 2 };
6027 rv
= sched_setscheduler(current
, SCHED_RR
, ¶m
);
6029 drbd_err(connection
, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv
);
6031 while (get_t_state(thi
) == RUNNING
) {
6032 drbd_thread_current_set_cpu(thi
);
6034 conn_reclaim_net_peer_reqs(connection
);
6036 if (test_and_clear_bit(SEND_PING
, &connection
->flags
)) {
6037 if (drbd_send_ping(connection
)) {
6038 drbd_err(connection
, "drbd_send_ping has failed\n");
6041 set_ping_timeout(connection
);
6042 ping_timeout_active
= true;
6045 pre_recv_jif
= jiffies
;
6046 rv
= drbd_recv_short(connection
->meta
.socket
, buf
, expect
-received
, 0);
6049 * -EINTR (on meta) we got a signal
6050 * -EAGAIN (on meta) rcvtimeo expired
6051 * -ECONNRESET other side closed the connection
6052 * -ERESTARTSYS (on data) we got a signal
6053 * rv < 0 other than above: unexpected error!
6054 * rv == expected: full header or command
6055 * rv < expected: "woken" by signal during receive
6056 * rv == 0 : "connection shut down by peer"
6058 if (likely(rv
> 0)) {
6061 } else if (rv
== 0) {
6062 if (test_bit(DISCONNECT_SENT
, &connection
->flags
)) {
6065 t
= rcu_dereference(connection
->net_conf
)->ping_timeo
* HZ
/10;
6068 t
= wait_event_timeout(connection
->ping_wait
,
6069 connection
->cstate
< C_WF_REPORT_PARAMS
,
6074 drbd_err(connection
, "meta connection shut down by peer.\n");
6076 } else if (rv
== -EAGAIN
) {
6077 /* If the data socket received something meanwhile,
6078 * that is good enough: peer is still alive. */
6079 if (time_after(connection
->last_received
, pre_recv_jif
))
6081 if (ping_timeout_active
) {
6082 drbd_err(connection
, "PingAck did not arrive in time.\n");
6085 set_bit(SEND_PING
, &connection
->flags
);
6087 } else if (rv
== -EINTR
) {
6088 /* maybe drbd_thread_stop(): the while condition will notice.
6089 * maybe woken for send_ping: we'll send a ping above,
6090 * and change the rcvtimeo */
6091 flush_signals(current
);
6094 drbd_err(connection
, "sock_recvmsg returned %d\n", rv
);
6098 if (received
== expect
&& cmd
== NULL
) {
6099 if (decode_header(connection
, connection
->meta
.rbuf
, &pi
))
6101 cmd
= &ack_receiver_tbl
[pi
.cmd
];
6102 if (pi
.cmd
>= ARRAY_SIZE(ack_receiver_tbl
) || !cmd
->fn
) {
6103 drbd_err(connection
, "Unexpected meta packet %s (0x%04x)\n",
6104 cmdname(pi
.cmd
), pi
.cmd
);
6107 expect
= header_size
+ cmd
->pkt_size
;
6108 if (pi
.size
!= expect
- header_size
) {
6109 drbd_err(connection
, "Wrong packet size on meta (c: %d, l: %d)\n",
6114 if (received
== expect
) {
6117 err
= cmd
->fn(connection
, &pi
);
6119 drbd_err(connection
, "%pf failed\n", cmd
->fn
);
6123 connection
->last_received
= jiffies
;
6125 if (cmd
== &ack_receiver_tbl
[P_PING_ACK
]) {
6126 set_idle_timeout(connection
);
6127 ping_timeout_active
= false;
6130 buf
= connection
->meta
.rbuf
;
6132 expect
= header_size
;
6139 conn_request_state(connection
, NS(conn
, C_NETWORK_FAILURE
), CS_HARD
);
6140 conn_md_sync(connection
);
6144 conn_request_state(connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
6147 drbd_info(connection
, "ack_receiver terminated\n");
6152 void drbd_send_acks_wf(struct work_struct
*ws
)
6154 struct drbd_peer_device
*peer_device
=
6155 container_of(ws
, struct drbd_peer_device
, send_acks_work
);
6156 struct drbd_connection
*connection
= peer_device
->connection
;
6157 struct drbd_device
*device
= peer_device
->device
;
6158 struct net_conf
*nc
;
6162 nc
= rcu_dereference(connection
->net_conf
);
6163 tcp_cork
= nc
->tcp_cork
;
6167 drbd_tcp_cork(connection
->meta
.socket
);
6169 err
= drbd_finish_peer_reqs(device
);
6170 kref_put(&device
->kref
, drbd_destroy_device
);
6171 /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
6172 struct work_struct send_acks_work alive, which is in the peer_device object */
6175 conn_request_state(connection
, NS(conn
, C_NETWORK_FAILURE
), CS_HARD
);
6180 drbd_tcp_uncork(connection
->meta
.socket
);