4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
28 #include <asm/uaccess.h>
31 #include <linux/drbd.h>
33 #include <linux/file.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
47 #include "drbd_protocol.h"
51 #define PRO_FEATURES (FF_TRIM)
66 static int drbd_do_features(struct drbd_connection
*connection
);
67 static int drbd_do_auth(struct drbd_connection
*connection
);
68 static int drbd_disconnected(struct drbd_peer_device
*);
69 static void conn_wait_active_ee_empty(struct drbd_connection
*connection
);
70 static enum finish_epoch
drbd_may_finish_epoch(struct drbd_connection
*, struct drbd_epoch
*, enum epoch_event
);
71 static int e_end_block(struct drbd_work
*, int);
74 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
77 * some helper functions to deal with single linked page lists,
78 * page->private being our "next" pointer.
81 /* If at least n pages are linked at head, get n pages off.
82 * Otherwise, don't modify head, and return NULL.
83 * Locking is the responsibility of the caller.
85 static struct page
*page_chain_del(struct page
**head
, int n
)
99 tmp
= page_chain_next(page
);
101 break; /* found sufficient pages */
103 /* insufficient pages, don't use any of them. */
108 /* add end of list marker for the returned list */
109 set_page_private(page
, 0);
110 /* actual return value, and adjustment of head */
116 /* may be used outside of locks to find the tail of a (usually short)
117 * "private" page chain, before adding it back to a global chain head
118 * with page_chain_add() under a spinlock. */
119 static struct page
*page_chain_tail(struct page
*page
, int *len
)
123 while ((tmp
= page_chain_next(page
)))
130 static int page_chain_free(struct page
*page
)
134 page_chain_for_each_safe(page
, tmp
) {
141 static void page_chain_add(struct page
**head
,
142 struct page
*chain_first
, struct page
*chain_last
)
146 tmp
= page_chain_tail(chain_first
, NULL
);
147 BUG_ON(tmp
!= chain_last
);
150 /* add chain to head */
151 set_page_private(chain_last
, (unsigned long)*head
);
155 static struct page
*__drbd_alloc_pages(struct drbd_device
*device
,
158 struct page
*page
= NULL
;
159 struct page
*tmp
= NULL
;
162 /* Yes, testing drbd_pp_vacant outside the lock is racy.
163 * So what. It saves a spin_lock. */
164 if (drbd_pp_vacant
>= number
) {
165 spin_lock(&drbd_pp_lock
);
166 page
= page_chain_del(&drbd_pp_pool
, number
);
168 drbd_pp_vacant
-= number
;
169 spin_unlock(&drbd_pp_lock
);
174 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
175 * "criss-cross" setup, that might cause write-out on some other DRBD,
176 * which in turn might block on the other node at this very place. */
177 for (i
= 0; i
< number
; i
++) {
178 tmp
= alloc_page(GFP_TRY
);
181 set_page_private(tmp
, (unsigned long)page
);
188 /* Not enough pages immediately available this time.
189 * No need to jump around here, drbd_alloc_pages will retry this
190 * function "soon". */
192 tmp
= page_chain_tail(page
, NULL
);
193 spin_lock(&drbd_pp_lock
);
194 page_chain_add(&drbd_pp_pool
, page
, tmp
);
196 spin_unlock(&drbd_pp_lock
);
201 static void reclaim_finished_net_peer_reqs(struct drbd_device
*device
,
202 struct list_head
*to_be_freed
)
204 struct drbd_peer_request
*peer_req
, *tmp
;
206 /* The EEs are always appended to the end of the list. Since
207 they are sent in order over the wire, they have to finish
208 in order. As soon as we see the first not finished we can
209 stop to examine the list... */
211 list_for_each_entry_safe(peer_req
, tmp
, &device
->net_ee
, w
.list
) {
212 if (drbd_peer_req_has_active_page(peer_req
))
214 list_move(&peer_req
->w
.list
, to_be_freed
);
218 static void drbd_reclaim_net_peer_reqs(struct drbd_device
*device
)
220 LIST_HEAD(reclaimed
);
221 struct drbd_peer_request
*peer_req
, *t
;
223 spin_lock_irq(&device
->resource
->req_lock
);
224 reclaim_finished_net_peer_reqs(device
, &reclaimed
);
225 spin_unlock_irq(&device
->resource
->req_lock
);
226 list_for_each_entry_safe(peer_req
, t
, &reclaimed
, w
.list
)
227 drbd_free_net_peer_req(device
, peer_req
);
230 static void conn_reclaim_net_peer_reqs(struct drbd_connection
*connection
)
232 struct drbd_peer_device
*peer_device
;
236 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
) {
237 struct drbd_device
*device
= peer_device
->device
;
238 if (!atomic_read(&device
->pp_in_use_by_net
))
241 kref_get(&device
->kref
);
243 drbd_reclaim_net_peer_reqs(device
);
244 kref_put(&device
->kref
, drbd_destroy_device
);
251 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
252 * @device: DRBD device.
253 * @number: number of pages requested
254 * @retry: whether to retry, if not enough pages are available right now
256 * Tries to allocate number pages, first from our own page pool, then from
258 * Possibly retry until DRBD frees sufficient pages somewhere else.
260 * If this allocation would exceed the max_buffers setting, we throttle
261 * allocation (schedule_timeout) to give the system some room to breathe.
263 * We do not use max-buffers as hard limit, because it could lead to
264 * congestion and further to a distributed deadlock during online-verify or
265 * (checksum based) resync, if the max-buffers, socket buffer sizes and
266 * resync-rate settings are mis-configured.
268 * Returns a page chain linked via page->private.
270 struct page
*drbd_alloc_pages(struct drbd_peer_device
*peer_device
, unsigned int number
,
273 struct drbd_device
*device
= peer_device
->device
;
274 struct page
*page
= NULL
;
280 nc
= rcu_dereference(peer_device
->connection
->net_conf
);
281 mxb
= nc
? nc
->max_buffers
: 1000000;
284 if (atomic_read(&device
->pp_in_use
) < mxb
)
285 page
= __drbd_alloc_pages(device
, number
);
287 /* Try to keep the fast path fast, but occasionally we need
288 * to reclaim the pages we lended to the network stack. */
289 if (page
&& atomic_read(&device
->pp_in_use_by_net
) > 512)
290 drbd_reclaim_net_peer_reqs(device
);
292 while (page
== NULL
) {
293 prepare_to_wait(&drbd_pp_wait
, &wait
, TASK_INTERRUPTIBLE
);
295 drbd_reclaim_net_peer_reqs(device
);
297 if (atomic_read(&device
->pp_in_use
) < mxb
) {
298 page
= __drbd_alloc_pages(device
, number
);
306 if (signal_pending(current
)) {
307 drbd_warn(device
, "drbd_alloc_pages interrupted!\n");
311 if (schedule_timeout(HZ
/10) == 0)
314 finish_wait(&drbd_pp_wait
, &wait
);
317 atomic_add(number
, &device
->pp_in_use
);
321 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
322 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
323 * Either links the page chain back to the global pool,
324 * or returns all pages to the system. */
325 static void drbd_free_pages(struct drbd_device
*device
, struct page
*page
, int is_net
)
327 atomic_t
*a
= is_net
? &device
->pp_in_use_by_net
: &device
->pp_in_use
;
333 if (drbd_pp_vacant
> (DRBD_MAX_BIO_SIZE
/PAGE_SIZE
) * minor_count
)
334 i
= page_chain_free(page
);
337 tmp
= page_chain_tail(page
, &i
);
338 spin_lock(&drbd_pp_lock
);
339 page_chain_add(&drbd_pp_pool
, page
, tmp
);
341 spin_unlock(&drbd_pp_lock
);
343 i
= atomic_sub_return(i
, a
);
345 drbd_warn(device
, "ASSERTION FAILED: %s: %d < 0\n",
346 is_net
? "pp_in_use_by_net" : "pp_in_use", i
);
347 wake_up(&drbd_pp_wait
);
351 You need to hold the req_lock:
352 _drbd_wait_ee_list_empty()
354 You must not have the req_lock:
356 drbd_alloc_peer_req()
357 drbd_free_peer_reqs()
359 drbd_finish_peer_reqs()
361 drbd_wait_ee_list_empty()
364 struct drbd_peer_request
*
365 drbd_alloc_peer_req(struct drbd_peer_device
*peer_device
, u64 id
, sector_t sector
,
366 unsigned int data_size
, bool has_payload
, gfp_t gfp_mask
) __must_hold(local
)
368 struct drbd_device
*device
= peer_device
->device
;
369 struct drbd_peer_request
*peer_req
;
370 struct page
*page
= NULL
;
371 unsigned nr_pages
= (data_size
+ PAGE_SIZE
-1) >> PAGE_SHIFT
;
373 if (drbd_insert_fault(device
, DRBD_FAULT_AL_EE
))
376 peer_req
= mempool_alloc(drbd_ee_mempool
, gfp_mask
& ~__GFP_HIGHMEM
);
378 if (!(gfp_mask
& __GFP_NOWARN
))
379 drbd_err(device
, "%s: allocation failed\n", __func__
);
383 if (has_payload
&& data_size
) {
384 page
= drbd_alloc_pages(peer_device
, nr_pages
,
385 gfpflags_allow_blocking(gfp_mask
));
390 memset(peer_req
, 0, sizeof(*peer_req
));
391 INIT_LIST_HEAD(&peer_req
->w
.list
);
392 drbd_clear_interval(&peer_req
->i
);
393 peer_req
->i
.size
= data_size
;
394 peer_req
->i
.sector
= sector
;
395 peer_req
->submit_jif
= jiffies
;
396 peer_req
->peer_device
= peer_device
;
397 peer_req
->pages
= page
;
399 * The block_id is opaque to the receiver. It is not endianness
400 * converted, and sent back to the sender unchanged.
402 peer_req
->block_id
= id
;
407 mempool_free(peer_req
, drbd_ee_mempool
);
411 void __drbd_free_peer_req(struct drbd_device
*device
, struct drbd_peer_request
*peer_req
,
415 if (peer_req
->flags
& EE_HAS_DIGEST
)
416 kfree(peer_req
->digest
);
417 drbd_free_pages(device
, peer_req
->pages
, is_net
);
418 D_ASSERT(device
, atomic_read(&peer_req
->pending_bios
) == 0);
419 D_ASSERT(device
, drbd_interval_empty(&peer_req
->i
));
420 if (!expect(!(peer_req
->flags
& EE_CALL_AL_COMPLETE_IO
))) {
421 peer_req
->flags
&= ~EE_CALL_AL_COMPLETE_IO
;
422 drbd_al_complete_io(device
, &peer_req
->i
);
424 mempool_free(peer_req
, drbd_ee_mempool
);
427 int drbd_free_peer_reqs(struct drbd_device
*device
, struct list_head
*list
)
429 LIST_HEAD(work_list
);
430 struct drbd_peer_request
*peer_req
, *t
;
432 int is_net
= list
== &device
->net_ee
;
434 spin_lock_irq(&device
->resource
->req_lock
);
435 list_splice_init(list
, &work_list
);
436 spin_unlock_irq(&device
->resource
->req_lock
);
438 list_for_each_entry_safe(peer_req
, t
, &work_list
, w
.list
) {
439 __drbd_free_peer_req(device
, peer_req
, is_net
);
446 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
448 static int drbd_finish_peer_reqs(struct drbd_device
*device
)
450 LIST_HEAD(work_list
);
451 LIST_HEAD(reclaimed
);
452 struct drbd_peer_request
*peer_req
, *t
;
455 spin_lock_irq(&device
->resource
->req_lock
);
456 reclaim_finished_net_peer_reqs(device
, &reclaimed
);
457 list_splice_init(&device
->done_ee
, &work_list
);
458 spin_unlock_irq(&device
->resource
->req_lock
);
460 list_for_each_entry_safe(peer_req
, t
, &reclaimed
, w
.list
)
461 drbd_free_net_peer_req(device
, peer_req
);
463 /* possible callbacks here:
464 * e_end_block, and e_end_resync_block, e_send_superseded.
465 * all ignore the last argument.
467 list_for_each_entry_safe(peer_req
, t
, &work_list
, w
.list
) {
470 /* list_del not necessary, next/prev members not touched */
471 err2
= peer_req
->w
.cb(&peer_req
->w
, !!err
);
474 drbd_free_peer_req(device
, peer_req
);
476 wake_up(&device
->ee_wait
);
481 static void _drbd_wait_ee_list_empty(struct drbd_device
*device
,
482 struct list_head
*head
)
486 /* avoids spin_lock/unlock
487 * and calling prepare_to_wait in the fast path */
488 while (!list_empty(head
)) {
489 prepare_to_wait(&device
->ee_wait
, &wait
, TASK_UNINTERRUPTIBLE
);
490 spin_unlock_irq(&device
->resource
->req_lock
);
492 finish_wait(&device
->ee_wait
, &wait
);
493 spin_lock_irq(&device
->resource
->req_lock
);
497 static void drbd_wait_ee_list_empty(struct drbd_device
*device
,
498 struct list_head
*head
)
500 spin_lock_irq(&device
->resource
->req_lock
);
501 _drbd_wait_ee_list_empty(device
, head
);
502 spin_unlock_irq(&device
->resource
->req_lock
);
505 static int drbd_recv_short(struct socket
*sock
, void *buf
, size_t size
, int flags
)
511 struct msghdr msg
= {
512 .msg_flags
= (flags
? flags
: MSG_WAITALL
| MSG_NOSIGNAL
)
514 return kernel_recvmsg(sock
, &msg
, &iov
, 1, size
, msg
.msg_flags
);
517 static int drbd_recv(struct drbd_connection
*connection
, void *buf
, size_t size
)
521 rv
= drbd_recv_short(connection
->data
.socket
, buf
, size
, 0);
524 if (rv
== -ECONNRESET
)
525 drbd_info(connection
, "sock was reset by peer\n");
526 else if (rv
!= -ERESTARTSYS
)
527 drbd_err(connection
, "sock_recvmsg returned %d\n", rv
);
528 } else if (rv
== 0) {
529 if (test_bit(DISCONNECT_SENT
, &connection
->flags
)) {
532 t
= rcu_dereference(connection
->net_conf
)->ping_timeo
* HZ
/10;
535 t
= wait_event_timeout(connection
->ping_wait
, connection
->cstate
< C_WF_REPORT_PARAMS
, t
);
540 drbd_info(connection
, "sock was shut down by peer\n");
544 conn_request_state(connection
, NS(conn
, C_BROKEN_PIPE
), CS_HARD
);
550 static int drbd_recv_all(struct drbd_connection
*connection
, void *buf
, size_t size
)
554 err
= drbd_recv(connection
, buf
, size
);
563 static int drbd_recv_all_warn(struct drbd_connection
*connection
, void *buf
, size_t size
)
567 err
= drbd_recv_all(connection
, buf
, size
);
568 if (err
&& !signal_pending(current
))
569 drbd_warn(connection
, "short read (expected size %d)\n", (int)size
);
574 * On individual connections, the socket buffer size must be set prior to the
575 * listen(2) or connect(2) calls in order to have it take effect.
576 * This is our wrapper to do so.
578 static void drbd_setbufsize(struct socket
*sock
, unsigned int snd
,
581 /* open coded SO_SNDBUF, SO_RCVBUF */
583 sock
->sk
->sk_sndbuf
= snd
;
584 sock
->sk
->sk_userlocks
|= SOCK_SNDBUF_LOCK
;
587 sock
->sk
->sk_rcvbuf
= rcv
;
588 sock
->sk
->sk_userlocks
|= SOCK_RCVBUF_LOCK
;
592 static struct socket
*drbd_try_connect(struct drbd_connection
*connection
)
596 struct sockaddr_in6 src_in6
;
597 struct sockaddr_in6 peer_in6
;
599 int err
, peer_addr_len
, my_addr_len
;
600 int sndbuf_size
, rcvbuf_size
, connect_int
;
601 int disconnect_on_error
= 1;
604 nc
= rcu_dereference(connection
->net_conf
);
609 sndbuf_size
= nc
->sndbuf_size
;
610 rcvbuf_size
= nc
->rcvbuf_size
;
611 connect_int
= nc
->connect_int
;
614 my_addr_len
= min_t(int, connection
->my_addr_len
, sizeof(src_in6
));
615 memcpy(&src_in6
, &connection
->my_addr
, my_addr_len
);
617 if (((struct sockaddr
*)&connection
->my_addr
)->sa_family
== AF_INET6
)
618 src_in6
.sin6_port
= 0;
620 ((struct sockaddr_in
*)&src_in6
)->sin_port
= 0; /* AF_INET & AF_SCI */
622 peer_addr_len
= min_t(int, connection
->peer_addr_len
, sizeof(src_in6
));
623 memcpy(&peer_in6
, &connection
->peer_addr
, peer_addr_len
);
625 what
= "sock_create_kern";
626 err
= sock_create_kern(&init_net
, ((struct sockaddr
*)&src_in6
)->sa_family
,
627 SOCK_STREAM
, IPPROTO_TCP
, &sock
);
633 sock
->sk
->sk_rcvtimeo
=
634 sock
->sk
->sk_sndtimeo
= connect_int
* HZ
;
635 drbd_setbufsize(sock
, sndbuf_size
, rcvbuf_size
);
637 /* explicitly bind to the configured IP as source IP
638 * for the outgoing connections.
639 * This is needed for multihomed hosts and to be
640 * able to use lo: interfaces for drbd.
641 * Make sure to use 0 as port number, so linux selects
642 * a free one dynamically.
644 what
= "bind before connect";
645 err
= sock
->ops
->bind(sock
, (struct sockaddr
*) &src_in6
, my_addr_len
);
649 /* connect may fail, peer not yet available.
650 * stay C_WF_CONNECTION, don't go Disconnecting! */
651 disconnect_on_error
= 0;
653 err
= sock
->ops
->connect(sock
, (struct sockaddr
*) &peer_in6
, peer_addr_len
, 0);
662 /* timeout, busy, signal pending */
663 case ETIMEDOUT
: case EAGAIN
: case EINPROGRESS
:
664 case EINTR
: case ERESTARTSYS
:
665 /* peer not (yet) available, network problem */
666 case ECONNREFUSED
: case ENETUNREACH
:
667 case EHOSTDOWN
: case EHOSTUNREACH
:
668 disconnect_on_error
= 0;
671 drbd_err(connection
, "%s failed, err = %d\n", what
, err
);
673 if (disconnect_on_error
)
674 conn_request_state(connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
680 struct accept_wait_data
{
681 struct drbd_connection
*connection
;
682 struct socket
*s_listen
;
683 struct completion door_bell
;
684 void (*original_sk_state_change
)(struct sock
*sk
);
688 static void drbd_incoming_connection(struct sock
*sk
)
690 struct accept_wait_data
*ad
= sk
->sk_user_data
;
691 void (*state_change
)(struct sock
*sk
);
693 state_change
= ad
->original_sk_state_change
;
694 if (sk
->sk_state
== TCP_ESTABLISHED
)
695 complete(&ad
->door_bell
);
699 static int prepare_listen_socket(struct drbd_connection
*connection
, struct accept_wait_data
*ad
)
701 int err
, sndbuf_size
, rcvbuf_size
, my_addr_len
;
702 struct sockaddr_in6 my_addr
;
703 struct socket
*s_listen
;
708 nc
= rcu_dereference(connection
->net_conf
);
713 sndbuf_size
= nc
->sndbuf_size
;
714 rcvbuf_size
= nc
->rcvbuf_size
;
717 my_addr_len
= min_t(int, connection
->my_addr_len
, sizeof(struct sockaddr_in6
));
718 memcpy(&my_addr
, &connection
->my_addr
, my_addr_len
);
720 what
= "sock_create_kern";
721 err
= sock_create_kern(&init_net
, ((struct sockaddr
*)&my_addr
)->sa_family
,
722 SOCK_STREAM
, IPPROTO_TCP
, &s_listen
);
728 s_listen
->sk
->sk_reuse
= SK_CAN_REUSE
; /* SO_REUSEADDR */
729 drbd_setbufsize(s_listen
, sndbuf_size
, rcvbuf_size
);
731 what
= "bind before listen";
732 err
= s_listen
->ops
->bind(s_listen
, (struct sockaddr
*)&my_addr
, my_addr_len
);
736 ad
->s_listen
= s_listen
;
737 write_lock_bh(&s_listen
->sk
->sk_callback_lock
);
738 ad
->original_sk_state_change
= s_listen
->sk
->sk_state_change
;
739 s_listen
->sk
->sk_state_change
= drbd_incoming_connection
;
740 s_listen
->sk
->sk_user_data
= ad
;
741 write_unlock_bh(&s_listen
->sk
->sk_callback_lock
);
744 err
= s_listen
->ops
->listen(s_listen
, 5);
751 sock_release(s_listen
);
753 if (err
!= -EAGAIN
&& err
!= -EINTR
&& err
!= -ERESTARTSYS
) {
754 drbd_err(connection
, "%s failed, err = %d\n", what
, err
);
755 conn_request_state(connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
762 static void unregister_state_change(struct sock
*sk
, struct accept_wait_data
*ad
)
764 write_lock_bh(&sk
->sk_callback_lock
);
765 sk
->sk_state_change
= ad
->original_sk_state_change
;
766 sk
->sk_user_data
= NULL
;
767 write_unlock_bh(&sk
->sk_callback_lock
);
770 static struct socket
*drbd_wait_for_connect(struct drbd_connection
*connection
, struct accept_wait_data
*ad
)
772 int timeo
, connect_int
, err
= 0;
773 struct socket
*s_estab
= NULL
;
777 nc
= rcu_dereference(connection
->net_conf
);
782 connect_int
= nc
->connect_int
;
785 timeo
= connect_int
* HZ
;
786 /* 28.5% random jitter */
787 timeo
+= (prandom_u32() & 1) ? timeo
/ 7 : -timeo
/ 7;
789 err
= wait_for_completion_interruptible_timeout(&ad
->door_bell
, timeo
);
793 err
= kernel_accept(ad
->s_listen
, &s_estab
, 0);
795 if (err
!= -EAGAIN
&& err
!= -EINTR
&& err
!= -ERESTARTSYS
) {
796 drbd_err(connection
, "accept failed, err = %d\n", err
);
797 conn_request_state(connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
802 unregister_state_change(s_estab
->sk
, ad
);
807 static int decode_header(struct drbd_connection
*, void *, struct packet_info
*);
809 static int send_first_packet(struct drbd_connection
*connection
, struct drbd_socket
*sock
,
810 enum drbd_packet cmd
)
812 if (!conn_prepare_command(connection
, sock
))
814 return conn_send_command(connection
, sock
, cmd
, 0, NULL
, 0);
817 static int receive_first_packet(struct drbd_connection
*connection
, struct socket
*sock
)
819 unsigned int header_size
= drbd_header_size(connection
);
820 struct packet_info pi
;
825 nc
= rcu_dereference(connection
->net_conf
);
830 sock
->sk
->sk_rcvtimeo
= nc
->ping_timeo
* 4 * HZ
/ 10;
833 err
= drbd_recv_short(sock
, connection
->data
.rbuf
, header_size
, 0);
834 if (err
!= header_size
) {
839 err
= decode_header(connection
, connection
->data
.rbuf
, &pi
);
846 * drbd_socket_okay() - Free the socket if its connection is not okay
847 * @sock: pointer to the pointer to the socket.
849 static bool drbd_socket_okay(struct socket
**sock
)
857 rr
= drbd_recv_short(*sock
, tb
, 4, MSG_DONTWAIT
| MSG_PEEK
);
859 if (rr
> 0 || rr
== -EAGAIN
) {
868 static bool connection_established(struct drbd_connection
*connection
,
869 struct socket
**sock1
,
870 struct socket
**sock2
)
876 if (!*sock1
|| !*sock2
)
880 nc
= rcu_dereference(connection
->net_conf
);
881 timeout
= (nc
->sock_check_timeo
?: nc
->ping_timeo
) * HZ
/ 10;
883 schedule_timeout_interruptible(timeout
);
885 ok
= drbd_socket_okay(sock1
);
886 ok
= drbd_socket_okay(sock2
) && ok
;
891 /* Gets called if a connection is established, or if a new minor gets created
893 int drbd_connected(struct drbd_peer_device
*peer_device
)
895 struct drbd_device
*device
= peer_device
->device
;
898 atomic_set(&device
->packet_seq
, 0);
899 device
->peer_seq
= 0;
901 device
->state_mutex
= peer_device
->connection
->agreed_pro_version
< 100 ?
902 &peer_device
->connection
->cstate_mutex
:
903 &device
->own_state_mutex
;
905 err
= drbd_send_sync_param(peer_device
);
907 err
= drbd_send_sizes(peer_device
, 0, 0);
909 err
= drbd_send_uuids(peer_device
);
911 err
= drbd_send_current_state(peer_device
);
912 clear_bit(USE_DEGR_WFC_T
, &device
->flags
);
913 clear_bit(RESIZE_PENDING
, &device
->flags
);
914 atomic_set(&device
->ap_in_flight
, 0);
915 mod_timer(&device
->request_timer
, jiffies
+ HZ
); /* just start it here. */
921 * 1 yes, we have a valid connection
922 * 0 oops, did not work out, please try again
923 * -1 peer talks different language,
924 * no point in trying again, please go standalone.
925 * -2 We do not have a network config...
927 static int conn_connect(struct drbd_connection
*connection
)
929 struct drbd_socket sock
, msock
;
930 struct drbd_peer_device
*peer_device
;
933 bool discard_my_data
, ok
;
934 enum drbd_state_rv rv
;
935 struct accept_wait_data ad
= {
936 .connection
= connection
,
937 .door_bell
= COMPLETION_INITIALIZER_ONSTACK(ad
.door_bell
),
940 clear_bit(DISCONNECT_SENT
, &connection
->flags
);
941 if (conn_request_state(connection
, NS(conn
, C_WF_CONNECTION
), CS_VERBOSE
) < SS_SUCCESS
)
944 mutex_init(&sock
.mutex
);
945 sock
.sbuf
= connection
->data
.sbuf
;
946 sock
.rbuf
= connection
->data
.rbuf
;
948 mutex_init(&msock
.mutex
);
949 msock
.sbuf
= connection
->meta
.sbuf
;
950 msock
.rbuf
= connection
->meta
.rbuf
;
953 /* Assume that the peer only understands protocol 80 until we know better. */
954 connection
->agreed_pro_version
= 80;
956 if (prepare_listen_socket(connection
, &ad
))
962 s
= drbd_try_connect(connection
);
966 send_first_packet(connection
, &sock
, P_INITIAL_DATA
);
967 } else if (!msock
.socket
) {
968 clear_bit(RESOLVE_CONFLICTS
, &connection
->flags
);
970 send_first_packet(connection
, &msock
, P_INITIAL_META
);
972 drbd_err(connection
, "Logic error in conn_connect()\n");
973 goto out_release_sockets
;
977 if (connection_established(connection
, &sock
.socket
, &msock
.socket
))
981 s
= drbd_wait_for_connect(connection
, &ad
);
983 int fp
= receive_first_packet(connection
, s
);
984 drbd_socket_okay(&sock
.socket
);
985 drbd_socket_okay(&msock
.socket
);
989 drbd_warn(connection
, "initial packet S crossed\n");
990 sock_release(sock
.socket
);
997 set_bit(RESOLVE_CONFLICTS
, &connection
->flags
);
999 drbd_warn(connection
, "initial packet M crossed\n");
1000 sock_release(msock
.socket
);
1007 drbd_warn(connection
, "Error receiving initial packet\n");
1010 if (prandom_u32() & 1)
1015 if (connection
->cstate
<= C_DISCONNECTING
)
1016 goto out_release_sockets
;
1017 if (signal_pending(current
)) {
1018 flush_signals(current
);
1020 if (get_t_state(&connection
->receiver
) == EXITING
)
1021 goto out_release_sockets
;
1024 ok
= connection_established(connection
, &sock
.socket
, &msock
.socket
);
1028 sock_release(ad
.s_listen
);
1030 sock
.socket
->sk
->sk_reuse
= SK_CAN_REUSE
; /* SO_REUSEADDR */
1031 msock
.socket
->sk
->sk_reuse
= SK_CAN_REUSE
; /* SO_REUSEADDR */
1033 sock
.socket
->sk
->sk_allocation
= GFP_NOIO
;
1034 msock
.socket
->sk
->sk_allocation
= GFP_NOIO
;
1036 sock
.socket
->sk
->sk_priority
= TC_PRIO_INTERACTIVE_BULK
;
1037 msock
.socket
->sk
->sk_priority
= TC_PRIO_INTERACTIVE
;
1040 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1041 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1042 * first set it to the P_CONNECTION_FEATURES timeout,
1043 * which we set to 4x the configured ping_timeout. */
1045 nc
= rcu_dereference(connection
->net_conf
);
1047 sock
.socket
->sk
->sk_sndtimeo
=
1048 sock
.socket
->sk
->sk_rcvtimeo
= nc
->ping_timeo
*4*HZ
/10;
1050 msock
.socket
->sk
->sk_rcvtimeo
= nc
->ping_int
*HZ
;
1051 timeout
= nc
->timeout
* HZ
/ 10;
1052 discard_my_data
= nc
->discard_my_data
;
1055 msock
.socket
->sk
->sk_sndtimeo
= timeout
;
1057 /* we don't want delays.
1058 * we use TCP_CORK where appropriate, though */
1059 drbd_tcp_nodelay(sock
.socket
);
1060 drbd_tcp_nodelay(msock
.socket
);
1062 connection
->data
.socket
= sock
.socket
;
1063 connection
->meta
.socket
= msock
.socket
;
1064 connection
->last_received
= jiffies
;
1066 h
= drbd_do_features(connection
);
1070 if (connection
->cram_hmac_tfm
) {
1071 /* drbd_request_state(device, NS(conn, WFAuth)); */
1072 switch (drbd_do_auth(connection
)) {
1074 drbd_err(connection
, "Authentication of peer failed\n");
1077 drbd_err(connection
, "Authentication of peer failed, trying again.\n");
1082 connection
->data
.socket
->sk
->sk_sndtimeo
= timeout
;
1083 connection
->data
.socket
->sk
->sk_rcvtimeo
= MAX_SCHEDULE_TIMEOUT
;
1085 if (drbd_send_protocol(connection
) == -EOPNOTSUPP
)
1088 /* Prevent a race between resync-handshake and
1089 * being promoted to Primary.
1091 * Grab and release the state mutex, so we know that any current
1092 * drbd_set_role() is finished, and any incoming drbd_set_role
1093 * will see the STATE_SENT flag, and wait for it to be cleared.
1095 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
)
1096 mutex_lock(peer_device
->device
->state_mutex
);
1098 set_bit(STATE_SENT
, &connection
->flags
);
1100 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
)
1101 mutex_unlock(peer_device
->device
->state_mutex
);
1104 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
) {
1105 struct drbd_device
*device
= peer_device
->device
;
1106 kref_get(&device
->kref
);
1109 if (discard_my_data
)
1110 set_bit(DISCARD_MY_DATA
, &device
->flags
);
1112 clear_bit(DISCARD_MY_DATA
, &device
->flags
);
1114 drbd_connected(peer_device
);
1115 kref_put(&device
->kref
, drbd_destroy_device
);
1120 rv
= conn_request_state(connection
, NS(conn
, C_WF_REPORT_PARAMS
), CS_VERBOSE
);
1121 if (rv
< SS_SUCCESS
|| connection
->cstate
!= C_WF_REPORT_PARAMS
) {
1122 clear_bit(STATE_SENT
, &connection
->flags
);
1126 drbd_thread_start(&connection
->ack_receiver
);
1127 /* opencoded create_singlethread_workqueue(),
1128 * to be able to use format string arguments */
1129 connection
->ack_sender
=
1130 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM
, connection
->resource
->name
);
1131 if (!connection
->ack_sender
) {
1132 drbd_err(connection
, "Failed to create workqueue ack_sender\n");
1136 mutex_lock(&connection
->resource
->conf_update
);
1137 /* The discard_my_data flag is a single-shot modifier to the next
1138 * connection attempt, the handshake of which is now well underway.
1139 * No need for rcu style copying of the whole struct
1140 * just to clear a single value. */
1141 connection
->net_conf
->discard_my_data
= 0;
1142 mutex_unlock(&connection
->resource
->conf_update
);
1146 out_release_sockets
:
1148 sock_release(ad
.s_listen
);
1150 sock_release(sock
.socket
);
1152 sock_release(msock
.socket
);
1156 static int decode_header(struct drbd_connection
*connection
, void *header
, struct packet_info
*pi
)
1158 unsigned int header_size
= drbd_header_size(connection
);
1160 if (header_size
== sizeof(struct p_header100
) &&
1161 *(__be32
*)header
== cpu_to_be32(DRBD_MAGIC_100
)) {
1162 struct p_header100
*h
= header
;
1164 drbd_err(connection
, "Header padding is not zero\n");
1167 pi
->vnr
= be16_to_cpu(h
->volume
);
1168 pi
->cmd
= be16_to_cpu(h
->command
);
1169 pi
->size
= be32_to_cpu(h
->length
);
1170 } else if (header_size
== sizeof(struct p_header95
) &&
1171 *(__be16
*)header
== cpu_to_be16(DRBD_MAGIC_BIG
)) {
1172 struct p_header95
*h
= header
;
1173 pi
->cmd
= be16_to_cpu(h
->command
);
1174 pi
->size
= be32_to_cpu(h
->length
);
1176 } else if (header_size
== sizeof(struct p_header80
) &&
1177 *(__be32
*)header
== cpu_to_be32(DRBD_MAGIC
)) {
1178 struct p_header80
*h
= header
;
1179 pi
->cmd
= be16_to_cpu(h
->command
);
1180 pi
->size
= be16_to_cpu(h
->length
);
1183 drbd_err(connection
, "Wrong magic value 0x%08x in protocol version %d\n",
1184 be32_to_cpu(*(__be32
*)header
),
1185 connection
->agreed_pro_version
);
1188 pi
->data
= header
+ header_size
;
1192 static int drbd_recv_header(struct drbd_connection
*connection
, struct packet_info
*pi
)
1194 void *buffer
= connection
->data
.rbuf
;
1197 err
= drbd_recv_all_warn(connection
, buffer
, drbd_header_size(connection
));
1201 err
= decode_header(connection
, buffer
, pi
);
1202 connection
->last_received
= jiffies
;
1207 static void drbd_flush(struct drbd_connection
*connection
)
1210 struct drbd_peer_device
*peer_device
;
1213 if (connection
->resource
->write_ordering
>= WO_BDEV_FLUSH
) {
1215 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
) {
1216 struct drbd_device
*device
= peer_device
->device
;
1218 if (!get_ldev(device
))
1220 kref_get(&device
->kref
);
1223 /* Right now, we have only this one synchronous code path
1224 * for flushes between request epochs.
1225 * We may want to make those asynchronous,
1226 * or at least parallelize the flushes to the volume devices.
1228 device
->flush_jif
= jiffies
;
1229 set_bit(FLUSH_PENDING
, &device
->flags
);
1230 rv
= blkdev_issue_flush(device
->ldev
->backing_bdev
,
1232 clear_bit(FLUSH_PENDING
, &device
->flags
);
1234 drbd_info(device
, "local disk flush failed with status %d\n", rv
);
1235 /* would rather check on EOPNOTSUPP, but that is not reliable.
1236 * don't try again for ANY return value != 0
1237 * if (rv == -EOPNOTSUPP) */
1238 drbd_bump_write_ordering(connection
->resource
, NULL
, WO_DRAIN_IO
);
1241 kref_put(&device
->kref
, drbd_destroy_device
);
1252 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1253 * @device: DRBD device.
1254 * @epoch: Epoch object.
1257 static enum finish_epoch
drbd_may_finish_epoch(struct drbd_connection
*connection
,
1258 struct drbd_epoch
*epoch
,
1259 enum epoch_event ev
)
1262 struct drbd_epoch
*next_epoch
;
1263 enum finish_epoch rv
= FE_STILL_LIVE
;
1265 spin_lock(&connection
->epoch_lock
);
1269 epoch_size
= atomic_read(&epoch
->epoch_size
);
1271 switch (ev
& ~EV_CLEANUP
) {
1273 atomic_dec(&epoch
->active
);
1275 case EV_GOT_BARRIER_NR
:
1276 set_bit(DE_HAVE_BARRIER_NUMBER
, &epoch
->flags
);
1278 case EV_BECAME_LAST
:
1283 if (epoch_size
!= 0 &&
1284 atomic_read(&epoch
->active
) == 0 &&
1285 (test_bit(DE_HAVE_BARRIER_NUMBER
, &epoch
->flags
) || ev
& EV_CLEANUP
)) {
1286 if (!(ev
& EV_CLEANUP
)) {
1287 spin_unlock(&connection
->epoch_lock
);
1288 drbd_send_b_ack(epoch
->connection
, epoch
->barrier_nr
, epoch_size
);
1289 spin_lock(&connection
->epoch_lock
);
1292 /* FIXME: dec unacked on connection, once we have
1293 * something to count pending connection packets in. */
1294 if (test_bit(DE_HAVE_BARRIER_NUMBER
, &epoch
->flags
))
1295 dec_unacked(epoch
->connection
);
1298 if (connection
->current_epoch
!= epoch
) {
1299 next_epoch
= list_entry(epoch
->list
.next
, struct drbd_epoch
, list
);
1300 list_del(&epoch
->list
);
1301 ev
= EV_BECAME_LAST
| (ev
& EV_CLEANUP
);
1302 connection
->epochs
--;
1305 if (rv
== FE_STILL_LIVE
)
1309 atomic_set(&epoch
->epoch_size
, 0);
1310 /* atomic_set(&epoch->active, 0); is already zero */
1311 if (rv
== FE_STILL_LIVE
)
1322 spin_unlock(&connection
->epoch_lock
);
1327 static enum write_ordering_e
1328 max_allowed_wo(struct drbd_backing_dev
*bdev
, enum write_ordering_e wo
)
1330 struct disk_conf
*dc
;
1332 dc
= rcu_dereference(bdev
->disk_conf
);
1334 if (wo
== WO_BDEV_FLUSH
&& !dc
->disk_flushes
)
1336 if (wo
== WO_DRAIN_IO
&& !dc
->disk_drain
)
1343 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1344 * @connection: DRBD connection.
1345 * @wo: Write ordering method to try.
1347 void drbd_bump_write_ordering(struct drbd_resource
*resource
, struct drbd_backing_dev
*bdev
,
1348 enum write_ordering_e wo
)
1350 struct drbd_device
*device
;
1351 enum write_ordering_e pwo
;
1353 static char *write_ordering_str
[] = {
1355 [WO_DRAIN_IO
] = "drain",
1356 [WO_BDEV_FLUSH
] = "flush",
1359 pwo
= resource
->write_ordering
;
1360 if (wo
!= WO_BDEV_FLUSH
)
1363 idr_for_each_entry(&resource
->devices
, device
, vnr
) {
1364 if (get_ldev(device
)) {
1365 wo
= max_allowed_wo(device
->ldev
, wo
);
1366 if (device
->ldev
== bdev
)
1373 wo
= max_allowed_wo(bdev
, wo
);
1377 resource
->write_ordering
= wo
;
1378 if (pwo
!= resource
->write_ordering
|| wo
== WO_BDEV_FLUSH
)
1379 drbd_info(resource
, "Method to ensure write ordering: %s\n", write_ordering_str
[resource
->write_ordering
]);
1383 * drbd_submit_peer_request()
1384 * @device: DRBD device.
1385 * @peer_req: peer request
1386 * @rw: flag field, see bio->bi_rw
1388 * May spread the pages to multiple bios,
1389 * depending on bio_add_page restrictions.
1391 * Returns 0 if all bios have been submitted,
1392 * -ENOMEM if we could not allocate enough bios,
1393 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1394 * single page to an empty bio (which should never happen and likely indicates
1395 * that the lower level IO stack is in some way broken). This has been observed
1396 * on certain Xen deployments.
1398 /* TODO allocate from our own bio_set. */
1399 int drbd_submit_peer_request(struct drbd_device
*device
,
1400 struct drbd_peer_request
*peer_req
,
1401 const unsigned rw
, const int fault_type
)
1403 struct bio
*bios
= NULL
;
1405 struct page
*page
= peer_req
->pages
;
1406 sector_t sector
= peer_req
->i
.sector
;
1407 unsigned data_size
= peer_req
->i
.size
;
1408 unsigned n_bios
= 0;
1409 unsigned nr_pages
= (data_size
+ PAGE_SIZE
-1) >> PAGE_SHIFT
;
1412 if (peer_req
->flags
& EE_IS_TRIM_USE_ZEROOUT
) {
1413 /* wait for all pending IO completions, before we start
1414 * zeroing things out. */
1415 conn_wait_active_ee_empty(peer_req
->peer_device
->connection
);
1416 /* add it to the active list now,
1417 * so we can find it to present it in debugfs */
1418 peer_req
->submit_jif
= jiffies
;
1419 peer_req
->flags
|= EE_SUBMITTED
;
1420 spin_lock_irq(&device
->resource
->req_lock
);
1421 list_add_tail(&peer_req
->w
.list
, &device
->active_ee
);
1422 spin_unlock_irq(&device
->resource
->req_lock
);
1423 if (blkdev_issue_zeroout(device
->ldev
->backing_bdev
,
1424 sector
, data_size
>> 9, GFP_NOIO
, false))
1425 peer_req
->flags
|= EE_WAS_ERROR
;
1426 drbd_endio_write_sec_final(peer_req
);
1430 /* Discards don't have any payload.
1431 * But the scsi layer still expects a bio_vec it can use internally,
1432 * see sd_setup_discard_cmnd() and blk_add_request_payload(). */
1433 if (peer_req
->flags
& EE_IS_TRIM
)
1436 /* In most cases, we will only need one bio. But in case the lower
1437 * level restrictions happen to be different at this offset on this
1438 * side than those of the sending peer, we may need to submit the
1439 * request in more than one bio.
1441 * Plain bio_alloc is good enough here, this is no DRBD internally
1442 * generated bio, but a bio allocated on behalf of the peer.
1445 bio
= bio_alloc(GFP_NOIO
, nr_pages
);
1447 drbd_err(device
, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages
);
1450 /* > peer_req->i.sector, unless this is the first bio */
1451 bio
->bi_iter
.bi_sector
= sector
;
1452 bio
->bi_bdev
= device
->ldev
->backing_bdev
;
1454 bio
->bi_private
= peer_req
;
1455 bio
->bi_end_io
= drbd_peer_request_endio
;
1457 bio
->bi_next
= bios
;
1461 if (rw
& REQ_DISCARD
) {
1462 bio
->bi_iter
.bi_size
= data_size
;
1466 page_chain_for_each(page
) {
1467 unsigned len
= min_t(unsigned, data_size
, PAGE_SIZE
);
1468 if (!bio_add_page(bio
, page
, len
, 0)) {
1469 /* A single page must always be possible!
1470 * But in case it fails anyways,
1471 * we deal with it, and complain (below). */
1472 if (bio
->bi_vcnt
== 0) {
1474 "bio_add_page failed for len=%u, "
1475 "bi_vcnt=0 (bi_sector=%llu)\n",
1476 len
, (uint64_t)bio
->bi_iter
.bi_sector
);
1486 D_ASSERT(device
, data_size
== 0);
1488 D_ASSERT(device
, page
== NULL
);
1490 atomic_set(&peer_req
->pending_bios
, n_bios
);
1491 /* for debugfs: update timestamp, mark as submitted */
1492 peer_req
->submit_jif
= jiffies
;
1493 peer_req
->flags
|= EE_SUBMITTED
;
1496 bios
= bios
->bi_next
;
1497 bio
->bi_next
= NULL
;
1499 drbd_generic_make_request(device
, fault_type
, bio
);
1506 bios
= bios
->bi_next
;
1512 static void drbd_remove_epoch_entry_interval(struct drbd_device
*device
,
1513 struct drbd_peer_request
*peer_req
)
1515 struct drbd_interval
*i
= &peer_req
->i
;
1517 drbd_remove_interval(&device
->write_requests
, i
);
1518 drbd_clear_interval(i
);
1520 /* Wake up any processes waiting for this peer request to complete. */
1522 wake_up(&device
->misc_wait
);
1525 static void conn_wait_active_ee_empty(struct drbd_connection
*connection
)
1527 struct drbd_peer_device
*peer_device
;
1531 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
) {
1532 struct drbd_device
*device
= peer_device
->device
;
1534 kref_get(&device
->kref
);
1536 drbd_wait_ee_list_empty(device
, &device
->active_ee
);
1537 kref_put(&device
->kref
, drbd_destroy_device
);
1543 static int receive_Barrier(struct drbd_connection
*connection
, struct packet_info
*pi
)
1546 struct p_barrier
*p
= pi
->data
;
1547 struct drbd_epoch
*epoch
;
1549 /* FIXME these are unacked on connection,
1550 * not a specific (peer)device.
1552 connection
->current_epoch
->barrier_nr
= p
->barrier
;
1553 connection
->current_epoch
->connection
= connection
;
1554 rv
= drbd_may_finish_epoch(connection
, connection
->current_epoch
, EV_GOT_BARRIER_NR
);
1556 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1557 * the activity log, which means it would not be resynced in case the
1558 * R_PRIMARY crashes now.
1559 * Therefore we must send the barrier_ack after the barrier request was
1561 switch (connection
->resource
->write_ordering
) {
1563 if (rv
== FE_RECYCLED
)
1566 /* receiver context, in the writeout path of the other node.
1567 * avoid potential distributed deadlock */
1568 epoch
= kmalloc(sizeof(struct drbd_epoch
), GFP_NOIO
);
1572 drbd_warn(connection
, "Allocation of an epoch failed, slowing down\n");
1577 conn_wait_active_ee_empty(connection
);
1578 drbd_flush(connection
);
1580 if (atomic_read(&connection
->current_epoch
->epoch_size
)) {
1581 epoch
= kmalloc(sizeof(struct drbd_epoch
), GFP_NOIO
);
1588 drbd_err(connection
, "Strangeness in connection->write_ordering %d\n",
1589 connection
->resource
->write_ordering
);
1594 atomic_set(&epoch
->epoch_size
, 0);
1595 atomic_set(&epoch
->active
, 0);
1597 spin_lock(&connection
->epoch_lock
);
1598 if (atomic_read(&connection
->current_epoch
->epoch_size
)) {
1599 list_add(&epoch
->list
, &connection
->current_epoch
->list
);
1600 connection
->current_epoch
= epoch
;
1601 connection
->epochs
++;
1603 /* The current_epoch got recycled while we allocated this one... */
1606 spin_unlock(&connection
->epoch_lock
);
1611 /* used from receive_RSDataReply (recv_resync_read)
1612 * and from receive_Data */
1613 static struct drbd_peer_request
*
1614 read_in_block(struct drbd_peer_device
*peer_device
, u64 id
, sector_t sector
,
1615 struct packet_info
*pi
) __must_hold(local
)
1617 struct drbd_device
*device
= peer_device
->device
;
1618 const sector_t capacity
= drbd_get_capacity(device
->this_bdev
);
1619 struct drbd_peer_request
*peer_req
;
1621 int digest_size
, err
;
1622 unsigned int data_size
= pi
->size
, ds
;
1623 void *dig_in
= peer_device
->connection
->int_dig_in
;
1624 void *dig_vv
= peer_device
->connection
->int_dig_vv
;
1625 unsigned long *data
;
1626 struct p_trim
*trim
= (pi
->cmd
== P_TRIM
) ? pi
->data
: NULL
;
1629 if (!trim
&& peer_device
->connection
->peer_integrity_tfm
) {
1630 digest_size
= crypto_ahash_digestsize(peer_device
->connection
->peer_integrity_tfm
);
1632 * FIXME: Receive the incoming digest into the receive buffer
1633 * here, together with its struct p_data?
1635 err
= drbd_recv_all_warn(peer_device
->connection
, dig_in
, digest_size
);
1638 data_size
-= digest_size
;
1642 D_ASSERT(peer_device
, data_size
== 0);
1643 data_size
= be32_to_cpu(trim
->size
);
1646 if (!expect(IS_ALIGNED(data_size
, 512)))
1648 /* prepare for larger trim requests. */
1649 if (!trim
&& !expect(data_size
<= DRBD_MAX_BIO_SIZE
))
1652 /* even though we trust out peer,
1653 * we sometimes have to double check. */
1654 if (sector
+ (data_size
>>9) > capacity
) {
1655 drbd_err(device
, "request from peer beyond end of local disk: "
1656 "capacity: %llus < sector: %llus + size: %u\n",
1657 (unsigned long long)capacity
,
1658 (unsigned long long)sector
, data_size
);
1662 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1663 * "criss-cross" setup, that might cause write-out on some other DRBD,
1664 * which in turn might block on the other node at this very place. */
1665 peer_req
= drbd_alloc_peer_req(peer_device
, id
, sector
, data_size
, trim
== NULL
, GFP_NOIO
);
1669 peer_req
->flags
|= EE_WRITE
;
1674 page
= peer_req
->pages
;
1675 page_chain_for_each(page
) {
1676 unsigned len
= min_t(int, ds
, PAGE_SIZE
);
1678 err
= drbd_recv_all_warn(peer_device
->connection
, data
, len
);
1679 if (drbd_insert_fault(device
, DRBD_FAULT_RECEIVE
)) {
1680 drbd_err(device
, "Fault injection: Corrupting data on receive\n");
1681 data
[0] = data
[0] ^ (unsigned long)-1;
1685 drbd_free_peer_req(device
, peer_req
);
1692 drbd_csum_ee(peer_device
->connection
->peer_integrity_tfm
, peer_req
, dig_vv
);
1693 if (memcmp(dig_in
, dig_vv
, digest_size
)) {
1694 drbd_err(device
, "Digest integrity check FAILED: %llus +%u\n",
1695 (unsigned long long)sector
, data_size
);
1696 drbd_free_peer_req(device
, peer_req
);
1700 device
->recv_cnt
+= data_size
>> 9;
1704 /* drbd_drain_block() just takes a data block
1705 * out of the socket input buffer, and discards it.
1707 static int drbd_drain_block(struct drbd_peer_device
*peer_device
, int data_size
)
1716 page
= drbd_alloc_pages(peer_device
, 1, 1);
1720 unsigned int len
= min_t(int, data_size
, PAGE_SIZE
);
1722 err
= drbd_recv_all_warn(peer_device
->connection
, data
, len
);
1728 drbd_free_pages(peer_device
->device
, page
, 0);
1732 static int recv_dless_read(struct drbd_peer_device
*peer_device
, struct drbd_request
*req
,
1733 sector_t sector
, int data_size
)
1735 struct bio_vec bvec
;
1736 struct bvec_iter iter
;
1738 int digest_size
, err
, expect
;
1739 void *dig_in
= peer_device
->connection
->int_dig_in
;
1740 void *dig_vv
= peer_device
->connection
->int_dig_vv
;
1743 if (peer_device
->connection
->peer_integrity_tfm
) {
1744 digest_size
= crypto_ahash_digestsize(peer_device
->connection
->peer_integrity_tfm
);
1745 err
= drbd_recv_all_warn(peer_device
->connection
, dig_in
, digest_size
);
1748 data_size
-= digest_size
;
1751 /* optimistically update recv_cnt. if receiving fails below,
1752 * we disconnect anyways, and counters will be reset. */
1753 peer_device
->device
->recv_cnt
+= data_size
>>9;
1755 bio
= req
->master_bio
;
1756 D_ASSERT(peer_device
->device
, sector
== bio
->bi_iter
.bi_sector
);
1758 bio_for_each_segment(bvec
, bio
, iter
) {
1759 void *mapped
= kmap(bvec
.bv_page
) + bvec
.bv_offset
;
1760 expect
= min_t(int, data_size
, bvec
.bv_len
);
1761 err
= drbd_recv_all_warn(peer_device
->connection
, mapped
, expect
);
1762 kunmap(bvec
.bv_page
);
1765 data_size
-= expect
;
1769 drbd_csum_bio(peer_device
->connection
->peer_integrity_tfm
, bio
, dig_vv
);
1770 if (memcmp(dig_in
, dig_vv
, digest_size
)) {
1771 drbd_err(peer_device
, "Digest integrity check FAILED. Broken NICs?\n");
1776 D_ASSERT(peer_device
->device
, data_size
== 0);
1781 * e_end_resync_block() is called in ack_sender context via
1782 * drbd_finish_peer_reqs().
1784 static int e_end_resync_block(struct drbd_work
*w
, int unused
)
1786 struct drbd_peer_request
*peer_req
=
1787 container_of(w
, struct drbd_peer_request
, w
);
1788 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
1789 struct drbd_device
*device
= peer_device
->device
;
1790 sector_t sector
= peer_req
->i
.sector
;
1793 D_ASSERT(device
, drbd_interval_empty(&peer_req
->i
));
1795 if (likely((peer_req
->flags
& EE_WAS_ERROR
) == 0)) {
1796 drbd_set_in_sync(device
, sector
, peer_req
->i
.size
);
1797 err
= drbd_send_ack(peer_device
, P_RS_WRITE_ACK
, peer_req
);
1799 /* Record failure to sync */
1800 drbd_rs_failed_io(device
, sector
, peer_req
->i
.size
);
1802 err
= drbd_send_ack(peer_device
, P_NEG_ACK
, peer_req
);
1804 dec_unacked(device
);
1809 static int recv_resync_read(struct drbd_peer_device
*peer_device
, sector_t sector
,
1810 struct packet_info
*pi
) __releases(local
)
1812 struct drbd_device
*device
= peer_device
->device
;
1813 struct drbd_peer_request
*peer_req
;
1815 peer_req
= read_in_block(peer_device
, ID_SYNCER
, sector
, pi
);
1819 dec_rs_pending(device
);
1821 inc_unacked(device
);
1822 /* corresponding dec_unacked() in e_end_resync_block()
1823 * respective _drbd_clear_done_ee */
1825 peer_req
->w
.cb
= e_end_resync_block
;
1826 peer_req
->submit_jif
= jiffies
;
1828 spin_lock_irq(&device
->resource
->req_lock
);
1829 list_add_tail(&peer_req
->w
.list
, &device
->sync_ee
);
1830 spin_unlock_irq(&device
->resource
->req_lock
);
1832 atomic_add(pi
->size
>> 9, &device
->rs_sect_ev
);
1833 if (drbd_submit_peer_request(device
, peer_req
, WRITE
, DRBD_FAULT_RS_WR
) == 0)
1836 /* don't care for the reason here */
1837 drbd_err(device
, "submit failed, triggering re-connect\n");
1838 spin_lock_irq(&device
->resource
->req_lock
);
1839 list_del(&peer_req
->w
.list
);
1840 spin_unlock_irq(&device
->resource
->req_lock
);
1842 drbd_free_peer_req(device
, peer_req
);
1848 static struct drbd_request
*
1849 find_request(struct drbd_device
*device
, struct rb_root
*root
, u64 id
,
1850 sector_t sector
, bool missing_ok
, const char *func
)
1852 struct drbd_request
*req
;
1854 /* Request object according to our peer */
1855 req
= (struct drbd_request
*)(unsigned long)id
;
1856 if (drbd_contains_interval(root
, sector
, &req
->i
) && req
->i
.local
)
1859 drbd_err(device
, "%s: failed to find request 0x%lx, sector %llus\n", func
,
1860 (unsigned long)id
, (unsigned long long)sector
);
1865 static int receive_DataReply(struct drbd_connection
*connection
, struct packet_info
*pi
)
1867 struct drbd_peer_device
*peer_device
;
1868 struct drbd_device
*device
;
1869 struct drbd_request
*req
;
1872 struct p_data
*p
= pi
->data
;
1874 peer_device
= conn_peer_device(connection
, pi
->vnr
);
1877 device
= peer_device
->device
;
1879 sector
= be64_to_cpu(p
->sector
);
1881 spin_lock_irq(&device
->resource
->req_lock
);
1882 req
= find_request(device
, &device
->read_requests
, p
->block_id
, sector
, false, __func__
);
1883 spin_unlock_irq(&device
->resource
->req_lock
);
1887 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1888 * special casing it there for the various failure cases.
1889 * still no race with drbd_fail_pending_reads */
1890 err
= recv_dless_read(peer_device
, req
, sector
, pi
->size
);
1892 req_mod(req
, DATA_RECEIVED
);
1893 /* else: nothing. handled from drbd_disconnect...
1894 * I don't think we may complete this just yet
1895 * in case we are "on-disconnect: freeze" */
1900 static int receive_RSDataReply(struct drbd_connection
*connection
, struct packet_info
*pi
)
1902 struct drbd_peer_device
*peer_device
;
1903 struct drbd_device
*device
;
1906 struct p_data
*p
= pi
->data
;
1908 peer_device
= conn_peer_device(connection
, pi
->vnr
);
1911 device
= peer_device
->device
;
1913 sector
= be64_to_cpu(p
->sector
);
1914 D_ASSERT(device
, p
->block_id
== ID_SYNCER
);
1916 if (get_ldev(device
)) {
1917 /* data is submitted to disk within recv_resync_read.
1918 * corresponding put_ldev done below on error,
1919 * or in drbd_peer_request_endio. */
1920 err
= recv_resync_read(peer_device
, sector
, pi
);
1922 if (__ratelimit(&drbd_ratelimit_state
))
1923 drbd_err(device
, "Can not write resync data to local disk.\n");
1925 err
= drbd_drain_block(peer_device
, pi
->size
);
1927 drbd_send_ack_dp(peer_device
, P_NEG_ACK
, p
, pi
->size
);
1930 atomic_add(pi
->size
>> 9, &device
->rs_sect_in
);
1935 static void restart_conflicting_writes(struct drbd_device
*device
,
1936 sector_t sector
, int size
)
1938 struct drbd_interval
*i
;
1939 struct drbd_request
*req
;
1941 drbd_for_each_overlap(i
, &device
->write_requests
, sector
, size
) {
1944 req
= container_of(i
, struct drbd_request
, i
);
1945 if (req
->rq_state
& RQ_LOCAL_PENDING
||
1946 !(req
->rq_state
& RQ_POSTPONED
))
1948 /* as it is RQ_POSTPONED, this will cause it to
1949 * be queued on the retry workqueue. */
1950 __req_mod(req
, CONFLICT_RESOLVED
, NULL
);
1955 * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
1957 static int e_end_block(struct drbd_work
*w
, int cancel
)
1959 struct drbd_peer_request
*peer_req
=
1960 container_of(w
, struct drbd_peer_request
, w
);
1961 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
1962 struct drbd_device
*device
= peer_device
->device
;
1963 sector_t sector
= peer_req
->i
.sector
;
1966 if (peer_req
->flags
& EE_SEND_WRITE_ACK
) {
1967 if (likely((peer_req
->flags
& EE_WAS_ERROR
) == 0)) {
1968 pcmd
= (device
->state
.conn
>= C_SYNC_SOURCE
&&
1969 device
->state
.conn
<= C_PAUSED_SYNC_T
&&
1970 peer_req
->flags
& EE_MAY_SET_IN_SYNC
) ?
1971 P_RS_WRITE_ACK
: P_WRITE_ACK
;
1972 err
= drbd_send_ack(peer_device
, pcmd
, peer_req
);
1973 if (pcmd
== P_RS_WRITE_ACK
)
1974 drbd_set_in_sync(device
, sector
, peer_req
->i
.size
);
1976 err
= drbd_send_ack(peer_device
, P_NEG_ACK
, peer_req
);
1977 /* we expect it to be marked out of sync anyways...
1978 * maybe assert this? */
1980 dec_unacked(device
);
1983 /* we delete from the conflict detection hash _after_ we sent out the
1984 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1985 if (peer_req
->flags
& EE_IN_INTERVAL_TREE
) {
1986 spin_lock_irq(&device
->resource
->req_lock
);
1987 D_ASSERT(device
, !drbd_interval_empty(&peer_req
->i
));
1988 drbd_remove_epoch_entry_interval(device
, peer_req
);
1989 if (peer_req
->flags
& EE_RESTART_REQUESTS
)
1990 restart_conflicting_writes(device
, sector
, peer_req
->i
.size
);
1991 spin_unlock_irq(&device
->resource
->req_lock
);
1993 D_ASSERT(device
, drbd_interval_empty(&peer_req
->i
));
1995 drbd_may_finish_epoch(peer_device
->connection
, peer_req
->epoch
, EV_PUT
+ (cancel
? EV_CLEANUP
: 0));
2000 static int e_send_ack(struct drbd_work
*w
, enum drbd_packet ack
)
2002 struct drbd_peer_request
*peer_req
=
2003 container_of(w
, struct drbd_peer_request
, w
);
2004 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
2007 err
= drbd_send_ack(peer_device
, ack
, peer_req
);
2008 dec_unacked(peer_device
->device
);
2013 static int e_send_superseded(struct drbd_work
*w
, int unused
)
2015 return e_send_ack(w
, P_SUPERSEDED
);
2018 static int e_send_retry_write(struct drbd_work
*w
, int unused
)
2020 struct drbd_peer_request
*peer_req
=
2021 container_of(w
, struct drbd_peer_request
, w
);
2022 struct drbd_connection
*connection
= peer_req
->peer_device
->connection
;
2024 return e_send_ack(w
, connection
->agreed_pro_version
>= 100 ?
2025 P_RETRY_WRITE
: P_SUPERSEDED
);
2028 static bool seq_greater(u32 a
, u32 b
)
2031 * We assume 32-bit wrap-around here.
2032 * For 24-bit wrap-around, we would have to shift:
2035 return (s32
)a
- (s32
)b
> 0;
2038 static u32
seq_max(u32 a
, u32 b
)
2040 return seq_greater(a
, b
) ? a
: b
;
2043 static void update_peer_seq(struct drbd_peer_device
*peer_device
, unsigned int peer_seq
)
2045 struct drbd_device
*device
= peer_device
->device
;
2046 unsigned int newest_peer_seq
;
2048 if (test_bit(RESOLVE_CONFLICTS
, &peer_device
->connection
->flags
)) {
2049 spin_lock(&device
->peer_seq_lock
);
2050 newest_peer_seq
= seq_max(device
->peer_seq
, peer_seq
);
2051 device
->peer_seq
= newest_peer_seq
;
2052 spin_unlock(&device
->peer_seq_lock
);
2053 /* wake up only if we actually changed device->peer_seq */
2054 if (peer_seq
== newest_peer_seq
)
2055 wake_up(&device
->seq_wait
);
2059 static inline int overlaps(sector_t s1
, int l1
, sector_t s2
, int l2
)
2061 return !((s1
+ (l1
>>9) <= s2
) || (s1
>= s2
+ (l2
>>9)));
2064 /* maybe change sync_ee into interval trees as well? */
2065 static bool overlapping_resync_write(struct drbd_device
*device
, struct drbd_peer_request
*peer_req
)
2067 struct drbd_peer_request
*rs_req
;
2070 spin_lock_irq(&device
->resource
->req_lock
);
2071 list_for_each_entry(rs_req
, &device
->sync_ee
, w
.list
) {
2072 if (overlaps(peer_req
->i
.sector
, peer_req
->i
.size
,
2073 rs_req
->i
.sector
, rs_req
->i
.size
)) {
2078 spin_unlock_irq(&device
->resource
->req_lock
);
2083 /* Called from receive_Data.
2084 * Synchronize packets on sock with packets on msock.
2086 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2087 * packet traveling on msock, they are still processed in the order they have
2090 * Note: we don't care for Ack packets overtaking P_DATA packets.
2092 * In case packet_seq is larger than device->peer_seq number, there are
2093 * outstanding packets on the msock. We wait for them to arrive.
2094 * In case we are the logically next packet, we update device->peer_seq
2095 * ourselves. Correctly handles 32bit wrap around.
2097 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2098 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2099 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2100 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2102 * returns 0 if we may process the packet,
2103 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2104 static int wait_for_and_update_peer_seq(struct drbd_peer_device
*peer_device
, const u32 peer_seq
)
2106 struct drbd_device
*device
= peer_device
->device
;
2111 if (!test_bit(RESOLVE_CONFLICTS
, &peer_device
->connection
->flags
))
2114 spin_lock(&device
->peer_seq_lock
);
2116 if (!seq_greater(peer_seq
- 1, device
->peer_seq
)) {
2117 device
->peer_seq
= seq_max(device
->peer_seq
, peer_seq
);
2121 if (signal_pending(current
)) {
2127 tp
= rcu_dereference(peer_device
->connection
->net_conf
)->two_primaries
;
2133 /* Only need to wait if two_primaries is enabled */
2134 prepare_to_wait(&device
->seq_wait
, &wait
, TASK_INTERRUPTIBLE
);
2135 spin_unlock(&device
->peer_seq_lock
);
2137 timeout
= rcu_dereference(peer_device
->connection
->net_conf
)->ping_timeo
*HZ
/10;
2139 timeout
= schedule_timeout(timeout
);
2140 spin_lock(&device
->peer_seq_lock
);
2143 drbd_err(device
, "Timed out waiting for missing ack packets; disconnecting\n");
2147 spin_unlock(&device
->peer_seq_lock
);
2148 finish_wait(&device
->seq_wait
, &wait
);
2152 /* see also bio_flags_to_wire()
2153 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2154 * flags and back. We may replicate to other kernel versions. */
2155 static unsigned long wire_flags_to_bio(u32 dpf
)
2157 return (dpf
& DP_RW_SYNC
? REQ_SYNC
: 0) |
2158 (dpf
& DP_FUA
? REQ_FUA
: 0) |
2159 (dpf
& DP_FLUSH
? REQ_FLUSH
: 0) |
2160 (dpf
& DP_DISCARD
? REQ_DISCARD
: 0);
2163 static void fail_postponed_requests(struct drbd_device
*device
, sector_t sector
,
2166 struct drbd_interval
*i
;
2169 drbd_for_each_overlap(i
, &device
->write_requests
, sector
, size
) {
2170 struct drbd_request
*req
;
2171 struct bio_and_error m
;
2175 req
= container_of(i
, struct drbd_request
, i
);
2176 if (!(req
->rq_state
& RQ_POSTPONED
))
2178 req
->rq_state
&= ~RQ_POSTPONED
;
2179 __req_mod(req
, NEG_ACKED
, &m
);
2180 spin_unlock_irq(&device
->resource
->req_lock
);
2182 complete_master_bio(device
, &m
);
2183 spin_lock_irq(&device
->resource
->req_lock
);
2188 static int handle_write_conflicts(struct drbd_device
*device
,
2189 struct drbd_peer_request
*peer_req
)
2191 struct drbd_connection
*connection
= peer_req
->peer_device
->connection
;
2192 bool resolve_conflicts
= test_bit(RESOLVE_CONFLICTS
, &connection
->flags
);
2193 sector_t sector
= peer_req
->i
.sector
;
2194 const unsigned int size
= peer_req
->i
.size
;
2195 struct drbd_interval
*i
;
2200 * Inserting the peer request into the write_requests tree will prevent
2201 * new conflicting local requests from being added.
2203 drbd_insert_interval(&device
->write_requests
, &peer_req
->i
);
2206 drbd_for_each_overlap(i
, &device
->write_requests
, sector
, size
) {
2207 if (i
== &peer_req
->i
)
2214 * Our peer has sent a conflicting remote request; this
2215 * should not happen in a two-node setup. Wait for the
2216 * earlier peer request to complete.
2218 err
= drbd_wait_misc(device
, i
);
2224 equal
= i
->sector
== sector
&& i
->size
== size
;
2225 if (resolve_conflicts
) {
2227 * If the peer request is fully contained within the
2228 * overlapping request, it can be considered overwritten
2229 * and thus superseded; otherwise, it will be retried
2230 * once all overlapping requests have completed.
2232 bool superseded
= i
->sector
<= sector
&& i
->sector
+
2233 (i
->size
>> 9) >= sector
+ (size
>> 9);
2236 drbd_alert(device
, "Concurrent writes detected: "
2237 "local=%llus +%u, remote=%llus +%u, "
2238 "assuming %s came first\n",
2239 (unsigned long long)i
->sector
, i
->size
,
2240 (unsigned long long)sector
, size
,
2241 superseded
? "local" : "remote");
2243 peer_req
->w
.cb
= superseded
? e_send_superseded
:
2245 list_add_tail(&peer_req
->w
.list
, &device
->done_ee
);
2246 queue_work(connection
->ack_sender
, &peer_req
->peer_device
->send_acks_work
);
2251 struct drbd_request
*req
=
2252 container_of(i
, struct drbd_request
, i
);
2255 drbd_alert(device
, "Concurrent writes detected: "
2256 "local=%llus +%u, remote=%llus +%u\n",
2257 (unsigned long long)i
->sector
, i
->size
,
2258 (unsigned long long)sector
, size
);
2260 if (req
->rq_state
& RQ_LOCAL_PENDING
||
2261 !(req
->rq_state
& RQ_POSTPONED
)) {
2263 * Wait for the node with the discard flag to
2264 * decide if this request has been superseded
2265 * or needs to be retried.
2266 * Requests that have been superseded will
2267 * disappear from the write_requests tree.
2269 * In addition, wait for the conflicting
2270 * request to finish locally before submitting
2271 * the conflicting peer request.
2273 err
= drbd_wait_misc(device
, &req
->i
);
2275 _conn_request_state(connection
, NS(conn
, C_TIMEOUT
), CS_HARD
);
2276 fail_postponed_requests(device
, sector
, size
);
2282 * Remember to restart the conflicting requests after
2283 * the new peer request has completed.
2285 peer_req
->flags
|= EE_RESTART_REQUESTS
;
2292 drbd_remove_epoch_entry_interval(device
, peer_req
);
2296 /* mirrored write */
2297 static int receive_Data(struct drbd_connection
*connection
, struct packet_info
*pi
)
2299 struct drbd_peer_device
*peer_device
;
2300 struct drbd_device
*device
;
2301 struct net_conf
*nc
;
2303 struct drbd_peer_request
*peer_req
;
2304 struct p_data
*p
= pi
->data
;
2305 u32 peer_seq
= be32_to_cpu(p
->seq_num
);
2310 peer_device
= conn_peer_device(connection
, pi
->vnr
);
2313 device
= peer_device
->device
;
2315 if (!get_ldev(device
)) {
2318 err
= wait_for_and_update_peer_seq(peer_device
, peer_seq
);
2319 drbd_send_ack_dp(peer_device
, P_NEG_ACK
, p
, pi
->size
);
2320 atomic_inc(&connection
->current_epoch
->epoch_size
);
2321 err2
= drbd_drain_block(peer_device
, pi
->size
);
2328 * Corresponding put_ldev done either below (on various errors), or in
2329 * drbd_peer_request_endio, if we successfully submit the data at the
2330 * end of this function.
2333 sector
= be64_to_cpu(p
->sector
);
2334 peer_req
= read_in_block(peer_device
, p
->block_id
, sector
, pi
);
2340 peer_req
->w
.cb
= e_end_block
;
2341 peer_req
->submit_jif
= jiffies
;
2342 peer_req
->flags
|= EE_APPLICATION
;
2344 dp_flags
= be32_to_cpu(p
->dp_flags
);
2345 rw
|= wire_flags_to_bio(dp_flags
);
2346 if (pi
->cmd
== P_TRIM
) {
2347 struct request_queue
*q
= bdev_get_queue(device
->ldev
->backing_bdev
);
2348 peer_req
->flags
|= EE_IS_TRIM
;
2349 if (!blk_queue_discard(q
))
2350 peer_req
->flags
|= EE_IS_TRIM_USE_ZEROOUT
;
2351 D_ASSERT(peer_device
, peer_req
->i
.size
> 0);
2352 D_ASSERT(peer_device
, rw
& REQ_DISCARD
);
2353 D_ASSERT(peer_device
, peer_req
->pages
== NULL
);
2354 } else if (peer_req
->pages
== NULL
) {
2355 D_ASSERT(device
, peer_req
->i
.size
== 0);
2356 D_ASSERT(device
, dp_flags
& DP_FLUSH
);
2359 if (dp_flags
& DP_MAY_SET_IN_SYNC
)
2360 peer_req
->flags
|= EE_MAY_SET_IN_SYNC
;
2362 spin_lock(&connection
->epoch_lock
);
2363 peer_req
->epoch
= connection
->current_epoch
;
2364 atomic_inc(&peer_req
->epoch
->epoch_size
);
2365 atomic_inc(&peer_req
->epoch
->active
);
2366 spin_unlock(&connection
->epoch_lock
);
2369 nc
= rcu_dereference(peer_device
->connection
->net_conf
);
2370 tp
= nc
->two_primaries
;
2371 if (peer_device
->connection
->agreed_pro_version
< 100) {
2372 switch (nc
->wire_protocol
) {
2374 dp_flags
|= DP_SEND_WRITE_ACK
;
2377 dp_flags
|= DP_SEND_RECEIVE_ACK
;
2383 if (dp_flags
& DP_SEND_WRITE_ACK
) {
2384 peer_req
->flags
|= EE_SEND_WRITE_ACK
;
2385 inc_unacked(device
);
2386 /* corresponding dec_unacked() in e_end_block()
2387 * respective _drbd_clear_done_ee */
2390 if (dp_flags
& DP_SEND_RECEIVE_ACK
) {
2391 /* I really don't like it that the receiver thread
2392 * sends on the msock, but anyways */
2393 drbd_send_ack(peer_device
, P_RECV_ACK
, peer_req
);
2397 /* two primaries implies protocol C */
2398 D_ASSERT(device
, dp_flags
& DP_SEND_WRITE_ACK
);
2399 peer_req
->flags
|= EE_IN_INTERVAL_TREE
;
2400 err
= wait_for_and_update_peer_seq(peer_device
, peer_seq
);
2402 goto out_interrupted
;
2403 spin_lock_irq(&device
->resource
->req_lock
);
2404 err
= handle_write_conflicts(device
, peer_req
);
2406 spin_unlock_irq(&device
->resource
->req_lock
);
2407 if (err
== -ENOENT
) {
2411 goto out_interrupted
;
2414 update_peer_seq(peer_device
, peer_seq
);
2415 spin_lock_irq(&device
->resource
->req_lock
);
2417 /* if we use the zeroout fallback code, we process synchronously
2418 * and we wait for all pending requests, respectively wait for
2419 * active_ee to become empty in drbd_submit_peer_request();
2420 * better not add ourselves here. */
2421 if ((peer_req
->flags
& EE_IS_TRIM_USE_ZEROOUT
) == 0)
2422 list_add_tail(&peer_req
->w
.list
, &device
->active_ee
);
2423 spin_unlock_irq(&device
->resource
->req_lock
);
2425 if (device
->state
.conn
== C_SYNC_TARGET
)
2426 wait_event(device
->ee_wait
, !overlapping_resync_write(device
, peer_req
));
2428 if (device
->state
.pdsk
< D_INCONSISTENT
) {
2429 /* In case we have the only disk of the cluster, */
2430 drbd_set_out_of_sync(device
, peer_req
->i
.sector
, peer_req
->i
.size
);
2431 peer_req
->flags
&= ~EE_MAY_SET_IN_SYNC
;
2432 drbd_al_begin_io(device
, &peer_req
->i
);
2433 peer_req
->flags
|= EE_CALL_AL_COMPLETE_IO
;
2436 err
= drbd_submit_peer_request(device
, peer_req
, rw
, DRBD_FAULT_DT_WR
);
2440 /* don't care for the reason here */
2441 drbd_err(device
, "submit failed, triggering re-connect\n");
2442 spin_lock_irq(&device
->resource
->req_lock
);
2443 list_del(&peer_req
->w
.list
);
2444 drbd_remove_epoch_entry_interval(device
, peer_req
);
2445 spin_unlock_irq(&device
->resource
->req_lock
);
2446 if (peer_req
->flags
& EE_CALL_AL_COMPLETE_IO
) {
2447 peer_req
->flags
&= ~EE_CALL_AL_COMPLETE_IO
;
2448 drbd_al_complete_io(device
, &peer_req
->i
);
2452 drbd_may_finish_epoch(connection
, peer_req
->epoch
, EV_PUT
+ EV_CLEANUP
);
2454 drbd_free_peer_req(device
, peer_req
);
2458 /* We may throttle resync, if the lower device seems to be busy,
2459 * and current sync rate is above c_min_rate.
2461 * To decide whether or not the lower device is busy, we use a scheme similar
2462 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2463 * (more than 64 sectors) of activity we cannot account for with our own resync
2464 * activity, it obviously is "busy".
2466 * The current sync rate used here uses only the most recent two step marks,
2467 * to have a short time average so we can react faster.
2469 bool drbd_rs_should_slow_down(struct drbd_device
*device
, sector_t sector
,
2470 bool throttle_if_app_is_waiting
)
2472 struct lc_element
*tmp
;
2473 bool throttle
= drbd_rs_c_min_rate_throttle(device
);
2475 if (!throttle
|| throttle_if_app_is_waiting
)
2478 spin_lock_irq(&device
->al_lock
);
2479 tmp
= lc_find(device
->resync
, BM_SECT_TO_EXT(sector
));
2481 struct bm_extent
*bm_ext
= lc_entry(tmp
, struct bm_extent
, lce
);
2482 if (test_bit(BME_PRIORITY
, &bm_ext
->flags
))
2484 /* Do not slow down if app IO is already waiting for this extent,
2485 * and our progress is necessary for application IO to complete. */
2487 spin_unlock_irq(&device
->al_lock
);
2492 bool drbd_rs_c_min_rate_throttle(struct drbd_device
*device
)
2494 struct gendisk
*disk
= device
->ldev
->backing_bdev
->bd_contains
->bd_disk
;
2495 unsigned long db
, dt
, dbdt
;
2496 unsigned int c_min_rate
;
2500 c_min_rate
= rcu_dereference(device
->ldev
->disk_conf
)->c_min_rate
;
2503 /* feature disabled? */
2504 if (c_min_rate
== 0)
2507 curr_events
= (int)part_stat_read(&disk
->part0
, sectors
[0]) +
2508 (int)part_stat_read(&disk
->part0
, sectors
[1]) -
2509 atomic_read(&device
->rs_sect_ev
);
2511 if (atomic_read(&device
->ap_actlog_cnt
)
2512 || curr_events
- device
->rs_last_events
> 64) {
2513 unsigned long rs_left
;
2516 device
->rs_last_events
= curr_events
;
2518 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2520 i
= (device
->rs_last_mark
+ DRBD_SYNC_MARKS
-1) % DRBD_SYNC_MARKS
;
2522 if (device
->state
.conn
== C_VERIFY_S
|| device
->state
.conn
== C_VERIFY_T
)
2523 rs_left
= device
->ov_left
;
2525 rs_left
= drbd_bm_total_weight(device
) - device
->rs_failed
;
2527 dt
= ((long)jiffies
- (long)device
->rs_mark_time
[i
]) / HZ
;
2530 db
= device
->rs_mark_left
[i
] - rs_left
;
2531 dbdt
= Bit2KB(db
/dt
);
2533 if (dbdt
> c_min_rate
)
2539 static int receive_DataRequest(struct drbd_connection
*connection
, struct packet_info
*pi
)
2541 struct drbd_peer_device
*peer_device
;
2542 struct drbd_device
*device
;
2545 struct drbd_peer_request
*peer_req
;
2546 struct digest_info
*di
= NULL
;
2548 unsigned int fault_type
;
2549 struct p_block_req
*p
= pi
->data
;
2551 peer_device
= conn_peer_device(connection
, pi
->vnr
);
2554 device
= peer_device
->device
;
2555 capacity
= drbd_get_capacity(device
->this_bdev
);
2557 sector
= be64_to_cpu(p
->sector
);
2558 size
= be32_to_cpu(p
->blksize
);
2560 if (size
<= 0 || !IS_ALIGNED(size
, 512) || size
> DRBD_MAX_BIO_SIZE
) {
2561 drbd_err(device
, "%s:%d: sector: %llus, size: %u\n", __FILE__
, __LINE__
,
2562 (unsigned long long)sector
, size
);
2565 if (sector
+ (size
>>9) > capacity
) {
2566 drbd_err(device
, "%s:%d: sector: %llus, size: %u\n", __FILE__
, __LINE__
,
2567 (unsigned long long)sector
, size
);
2571 if (!get_ldev_if_state(device
, D_UP_TO_DATE
)) {
2574 case P_DATA_REQUEST
:
2575 drbd_send_ack_rp(peer_device
, P_NEG_DREPLY
, p
);
2577 case P_RS_DATA_REQUEST
:
2578 case P_CSUM_RS_REQUEST
:
2580 drbd_send_ack_rp(peer_device
, P_NEG_RS_DREPLY
, p
);
2584 dec_rs_pending(device
);
2585 drbd_send_ack_ex(peer_device
, P_OV_RESULT
, sector
, size
, ID_IN_SYNC
);
2590 if (verb
&& __ratelimit(&drbd_ratelimit_state
))
2591 drbd_err(device
, "Can not satisfy peer's read request, "
2592 "no local data.\n");
2594 /* drain possibly payload */
2595 return drbd_drain_block(peer_device
, pi
->size
);
2598 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2599 * "criss-cross" setup, that might cause write-out on some other DRBD,
2600 * which in turn might block on the other node at this very place. */
2601 peer_req
= drbd_alloc_peer_req(peer_device
, p
->block_id
, sector
, size
,
2602 true /* has real payload */, GFP_NOIO
);
2609 case P_DATA_REQUEST
:
2610 peer_req
->w
.cb
= w_e_end_data_req
;
2611 fault_type
= DRBD_FAULT_DT_RD
;
2612 /* application IO, don't drbd_rs_begin_io */
2613 peer_req
->flags
|= EE_APPLICATION
;
2616 case P_RS_DATA_REQUEST
:
2617 peer_req
->w
.cb
= w_e_end_rsdata_req
;
2618 fault_type
= DRBD_FAULT_RS_RD
;
2619 /* used in the sector offset progress display */
2620 device
->bm_resync_fo
= BM_SECT_TO_BIT(sector
);
2624 case P_CSUM_RS_REQUEST
:
2625 fault_type
= DRBD_FAULT_RS_RD
;
2626 di
= kmalloc(sizeof(*di
) + pi
->size
, GFP_NOIO
);
2630 di
->digest_size
= pi
->size
;
2631 di
->digest
= (((char *)di
)+sizeof(struct digest_info
));
2633 peer_req
->digest
= di
;
2634 peer_req
->flags
|= EE_HAS_DIGEST
;
2636 if (drbd_recv_all(peer_device
->connection
, di
->digest
, pi
->size
))
2639 if (pi
->cmd
== P_CSUM_RS_REQUEST
) {
2640 D_ASSERT(device
, peer_device
->connection
->agreed_pro_version
>= 89);
2641 peer_req
->w
.cb
= w_e_end_csum_rs_req
;
2642 /* used in the sector offset progress display */
2643 device
->bm_resync_fo
= BM_SECT_TO_BIT(sector
);
2644 /* remember to report stats in drbd_resync_finished */
2645 device
->use_csums
= true;
2646 } else if (pi
->cmd
== P_OV_REPLY
) {
2647 /* track progress, we may need to throttle */
2648 atomic_add(size
>> 9, &device
->rs_sect_in
);
2649 peer_req
->w
.cb
= w_e_end_ov_reply
;
2650 dec_rs_pending(device
);
2651 /* drbd_rs_begin_io done when we sent this request,
2652 * but accounting still needs to be done. */
2653 goto submit_for_resync
;
2658 if (device
->ov_start_sector
== ~(sector_t
)0 &&
2659 peer_device
->connection
->agreed_pro_version
>= 90) {
2660 unsigned long now
= jiffies
;
2662 device
->ov_start_sector
= sector
;
2663 device
->ov_position
= sector
;
2664 device
->ov_left
= drbd_bm_bits(device
) - BM_SECT_TO_BIT(sector
);
2665 device
->rs_total
= device
->ov_left
;
2666 for (i
= 0; i
< DRBD_SYNC_MARKS
; i
++) {
2667 device
->rs_mark_left
[i
] = device
->ov_left
;
2668 device
->rs_mark_time
[i
] = now
;
2670 drbd_info(device
, "Online Verify start sector: %llu\n",
2671 (unsigned long long)sector
);
2673 peer_req
->w
.cb
= w_e_end_ov_req
;
2674 fault_type
= DRBD_FAULT_RS_RD
;
2681 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2682 * wrt the receiver, but it is not as straightforward as it may seem.
2683 * Various places in the resync start and stop logic assume resync
2684 * requests are processed in order, requeuing this on the worker thread
2685 * introduces a bunch of new code for synchronization between threads.
2687 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2688 * "forever", throttling after drbd_rs_begin_io will lock that extent
2689 * for application writes for the same time. For now, just throttle
2690 * here, where the rest of the code expects the receiver to sleep for
2694 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2695 * this defers syncer requests for some time, before letting at least
2696 * on request through. The resync controller on the receiving side
2697 * will adapt to the incoming rate accordingly.
2699 * We cannot throttle here if remote is Primary/SyncTarget:
2700 * we would also throttle its application reads.
2701 * In that case, throttling is done on the SyncTarget only.
2704 /* Even though this may be a resync request, we do add to "read_ee";
2705 * "sync_ee" is only used for resync WRITEs.
2706 * Add to list early, so debugfs can find this request
2707 * even if we have to sleep below. */
2708 spin_lock_irq(&device
->resource
->req_lock
);
2709 list_add_tail(&peer_req
->w
.list
, &device
->read_ee
);
2710 spin_unlock_irq(&device
->resource
->req_lock
);
2712 update_receiver_timing_details(connection
, drbd_rs_should_slow_down
);
2713 if (device
->state
.peer
!= R_PRIMARY
2714 && drbd_rs_should_slow_down(device
, sector
, false))
2715 schedule_timeout_uninterruptible(HZ
/10);
2716 update_receiver_timing_details(connection
, drbd_rs_begin_io
);
2717 if (drbd_rs_begin_io(device
, sector
))
2721 atomic_add(size
>> 9, &device
->rs_sect_ev
);
2724 update_receiver_timing_details(connection
, drbd_submit_peer_request
);
2725 inc_unacked(device
);
2726 if (drbd_submit_peer_request(device
, peer_req
, READ
, fault_type
) == 0)
2729 /* don't care for the reason here */
2730 drbd_err(device
, "submit failed, triggering re-connect\n");
2733 spin_lock_irq(&device
->resource
->req_lock
);
2734 list_del(&peer_req
->w
.list
);
2735 spin_unlock_irq(&device
->resource
->req_lock
);
2736 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2739 drbd_free_peer_req(device
, peer_req
);
2744 * drbd_asb_recover_0p - Recover after split-brain with no remaining primaries
2746 static int drbd_asb_recover_0p(struct drbd_peer_device
*peer_device
) __must_hold(local
)
2748 struct drbd_device
*device
= peer_device
->device
;
2749 int self
, peer
, rv
= -100;
2750 unsigned long ch_self
, ch_peer
;
2751 enum drbd_after_sb_p after_sb_0p
;
2753 self
= device
->ldev
->md
.uuid
[UI_BITMAP
] & 1;
2754 peer
= device
->p_uuid
[UI_BITMAP
] & 1;
2756 ch_peer
= device
->p_uuid
[UI_SIZE
];
2757 ch_self
= device
->comm_bm_set
;
2760 after_sb_0p
= rcu_dereference(peer_device
->connection
->net_conf
)->after_sb_0p
;
2762 switch (after_sb_0p
) {
2764 case ASB_DISCARD_SECONDARY
:
2765 case ASB_CALL_HELPER
:
2767 drbd_err(device
, "Configuration error.\n");
2769 case ASB_DISCONNECT
:
2771 case ASB_DISCARD_YOUNGER_PRI
:
2772 if (self
== 0 && peer
== 1) {
2776 if (self
== 1 && peer
== 0) {
2780 /* Else fall through to one of the other strategies... */
2781 case ASB_DISCARD_OLDER_PRI
:
2782 if (self
== 0 && peer
== 1) {
2786 if (self
== 1 && peer
== 0) {
2790 /* Else fall through to one of the other strategies... */
2791 drbd_warn(device
, "Discard younger/older primary did not find a decision\n"
2792 "Using discard-least-changes instead\n");
2793 case ASB_DISCARD_ZERO_CHG
:
2794 if (ch_peer
== 0 && ch_self
== 0) {
2795 rv
= test_bit(RESOLVE_CONFLICTS
, &peer_device
->connection
->flags
)
2799 if (ch_peer
== 0) { rv
= 1; break; }
2800 if (ch_self
== 0) { rv
= -1; break; }
2802 if (after_sb_0p
== ASB_DISCARD_ZERO_CHG
)
2804 case ASB_DISCARD_LEAST_CHG
:
2805 if (ch_self
< ch_peer
)
2807 else if (ch_self
> ch_peer
)
2809 else /* ( ch_self == ch_peer ) */
2810 /* Well, then use something else. */
2811 rv
= test_bit(RESOLVE_CONFLICTS
, &peer_device
->connection
->flags
)
2814 case ASB_DISCARD_LOCAL
:
2817 case ASB_DISCARD_REMOTE
:
2825 * drbd_asb_recover_1p - Recover after split-brain with one remaining primary
2827 static int drbd_asb_recover_1p(struct drbd_peer_device
*peer_device
) __must_hold(local
)
2829 struct drbd_device
*device
= peer_device
->device
;
2831 enum drbd_after_sb_p after_sb_1p
;
2834 after_sb_1p
= rcu_dereference(peer_device
->connection
->net_conf
)->after_sb_1p
;
2836 switch (after_sb_1p
) {
2837 case ASB_DISCARD_YOUNGER_PRI
:
2838 case ASB_DISCARD_OLDER_PRI
:
2839 case ASB_DISCARD_LEAST_CHG
:
2840 case ASB_DISCARD_LOCAL
:
2841 case ASB_DISCARD_REMOTE
:
2842 case ASB_DISCARD_ZERO_CHG
:
2843 drbd_err(device
, "Configuration error.\n");
2845 case ASB_DISCONNECT
:
2848 hg
= drbd_asb_recover_0p(peer_device
);
2849 if (hg
== -1 && device
->state
.role
== R_SECONDARY
)
2851 if (hg
== 1 && device
->state
.role
== R_PRIMARY
)
2855 rv
= drbd_asb_recover_0p(peer_device
);
2857 case ASB_DISCARD_SECONDARY
:
2858 return device
->state
.role
== R_PRIMARY
? 1 : -1;
2859 case ASB_CALL_HELPER
:
2860 hg
= drbd_asb_recover_0p(peer_device
);
2861 if (hg
== -1 && device
->state
.role
== R_PRIMARY
) {
2862 enum drbd_state_rv rv2
;
2864 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2865 * we might be here in C_WF_REPORT_PARAMS which is transient.
2866 * we do not need to wait for the after state change work either. */
2867 rv2
= drbd_change_state(device
, CS_VERBOSE
, NS(role
, R_SECONDARY
));
2868 if (rv2
!= SS_SUCCESS
) {
2869 drbd_khelper(device
, "pri-lost-after-sb");
2871 drbd_warn(device
, "Successfully gave up primary role.\n");
2882 * drbd_asb_recover_2p - Recover after split-brain with two remaining primaries
2884 static int drbd_asb_recover_2p(struct drbd_peer_device
*peer_device
) __must_hold(local
)
2886 struct drbd_device
*device
= peer_device
->device
;
2888 enum drbd_after_sb_p after_sb_2p
;
2891 after_sb_2p
= rcu_dereference(peer_device
->connection
->net_conf
)->after_sb_2p
;
2893 switch (after_sb_2p
) {
2894 case ASB_DISCARD_YOUNGER_PRI
:
2895 case ASB_DISCARD_OLDER_PRI
:
2896 case ASB_DISCARD_LEAST_CHG
:
2897 case ASB_DISCARD_LOCAL
:
2898 case ASB_DISCARD_REMOTE
:
2900 case ASB_DISCARD_SECONDARY
:
2901 case ASB_DISCARD_ZERO_CHG
:
2902 drbd_err(device
, "Configuration error.\n");
2905 rv
= drbd_asb_recover_0p(peer_device
);
2907 case ASB_DISCONNECT
:
2909 case ASB_CALL_HELPER
:
2910 hg
= drbd_asb_recover_0p(peer_device
);
2912 enum drbd_state_rv rv2
;
2914 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2915 * we might be here in C_WF_REPORT_PARAMS which is transient.
2916 * we do not need to wait for the after state change work either. */
2917 rv2
= drbd_change_state(device
, CS_VERBOSE
, NS(role
, R_SECONDARY
));
2918 if (rv2
!= SS_SUCCESS
) {
2919 drbd_khelper(device
, "pri-lost-after-sb");
2921 drbd_warn(device
, "Successfully gave up primary role.\n");
2931 static void drbd_uuid_dump(struct drbd_device
*device
, char *text
, u64
*uuid
,
2932 u64 bits
, u64 flags
)
2935 drbd_info(device
, "%s uuid info vanished while I was looking!\n", text
);
2938 drbd_info(device
, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2940 (unsigned long long)uuid
[UI_CURRENT
],
2941 (unsigned long long)uuid
[UI_BITMAP
],
2942 (unsigned long long)uuid
[UI_HISTORY_START
],
2943 (unsigned long long)uuid
[UI_HISTORY_END
],
2944 (unsigned long long)bits
,
2945 (unsigned long long)flags
);
2949 100 after split brain try auto recover
2950 2 C_SYNC_SOURCE set BitMap
2951 1 C_SYNC_SOURCE use BitMap
2953 -1 C_SYNC_TARGET use BitMap
2954 -2 C_SYNC_TARGET set BitMap
2955 -100 after split brain, disconnect
2956 -1000 unrelated data
2957 -1091 requires proto 91
2958 -1096 requires proto 96
2960 static int drbd_uuid_compare(struct drbd_device
*const device
, int *rule_nr
) __must_hold(local
)
2962 struct drbd_peer_device
*const peer_device
= first_peer_device(device
);
2963 struct drbd_connection
*const connection
= peer_device
? peer_device
->connection
: NULL
;
2967 self
= device
->ldev
->md
.uuid
[UI_CURRENT
] & ~((u64
)1);
2968 peer
= device
->p_uuid
[UI_CURRENT
] & ~((u64
)1);
2971 if (self
== UUID_JUST_CREATED
&& peer
== UUID_JUST_CREATED
)
2975 if ((self
== UUID_JUST_CREATED
|| self
== (u64
)0) &&
2976 peer
!= UUID_JUST_CREATED
)
2980 if (self
!= UUID_JUST_CREATED
&&
2981 (peer
== UUID_JUST_CREATED
|| peer
== (u64
)0))
2985 int rct
, dc
; /* roles at crash time */
2987 if (device
->p_uuid
[UI_BITMAP
] == (u64
)0 && device
->ldev
->md
.uuid
[UI_BITMAP
] != (u64
)0) {
2989 if (connection
->agreed_pro_version
< 91)
2992 if ((device
->ldev
->md
.uuid
[UI_BITMAP
] & ~((u64
)1)) == (device
->p_uuid
[UI_HISTORY_START
] & ~((u64
)1)) &&
2993 (device
->ldev
->md
.uuid
[UI_HISTORY_START
] & ~((u64
)1)) == (device
->p_uuid
[UI_HISTORY_START
+ 1] & ~((u64
)1))) {
2994 drbd_info(device
, "was SyncSource, missed the resync finished event, corrected myself:\n");
2995 drbd_uuid_move_history(device
);
2996 device
->ldev
->md
.uuid
[UI_HISTORY_START
] = device
->ldev
->md
.uuid
[UI_BITMAP
];
2997 device
->ldev
->md
.uuid
[UI_BITMAP
] = 0;
2999 drbd_uuid_dump(device
, "self", device
->ldev
->md
.uuid
,
3000 device
->state
.disk
>= D_NEGOTIATING
? drbd_bm_total_weight(device
) : 0, 0);
3003 drbd_info(device
, "was SyncSource (peer failed to write sync_uuid)\n");
3010 if (device
->ldev
->md
.uuid
[UI_BITMAP
] == (u64
)0 && device
->p_uuid
[UI_BITMAP
] != (u64
)0) {
3012 if (connection
->agreed_pro_version
< 91)
3015 if ((device
->ldev
->md
.uuid
[UI_HISTORY_START
] & ~((u64
)1)) == (device
->p_uuid
[UI_BITMAP
] & ~((u64
)1)) &&
3016 (device
->ldev
->md
.uuid
[UI_HISTORY_START
+ 1] & ~((u64
)1)) == (device
->p_uuid
[UI_HISTORY_START
] & ~((u64
)1))) {
3017 drbd_info(device
, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3019 device
->p_uuid
[UI_HISTORY_START
+ 1] = device
->p_uuid
[UI_HISTORY_START
];
3020 device
->p_uuid
[UI_HISTORY_START
] = device
->p_uuid
[UI_BITMAP
];
3021 device
->p_uuid
[UI_BITMAP
] = 0UL;
3023 drbd_uuid_dump(device
, "peer", device
->p_uuid
, device
->p_uuid
[UI_SIZE
], device
->p_uuid
[UI_FLAGS
]);
3026 drbd_info(device
, "was SyncTarget (failed to write sync_uuid)\n");
3033 /* Common power [off|failure] */
3034 rct
= (test_bit(CRASHED_PRIMARY
, &device
->flags
) ? 1 : 0) +
3035 (device
->p_uuid
[UI_FLAGS
] & 2);
3036 /* lowest bit is set when we were primary,
3037 * next bit (weight 2) is set when peer was primary */
3041 case 0: /* !self_pri && !peer_pri */ return 0;
3042 case 1: /* self_pri && !peer_pri */ return 1;
3043 case 2: /* !self_pri && peer_pri */ return -1;
3044 case 3: /* self_pri && peer_pri */
3045 dc
= test_bit(RESOLVE_CONFLICTS
, &connection
->flags
);
3051 peer
= device
->p_uuid
[UI_BITMAP
] & ~((u64
)1);
3056 peer
= device
->p_uuid
[UI_HISTORY_START
] & ~((u64
)1);
3058 if (connection
->agreed_pro_version
< 96 ?
3059 (device
->ldev
->md
.uuid
[UI_HISTORY_START
] & ~((u64
)1)) ==
3060 (device
->p_uuid
[UI_HISTORY_START
+ 1] & ~((u64
)1)) :
3061 peer
+ UUID_NEW_BM_OFFSET
== (device
->p_uuid
[UI_BITMAP
] & ~((u64
)1))) {
3062 /* The last P_SYNC_UUID did not get though. Undo the last start of
3063 resync as sync source modifications of the peer's UUIDs. */
3065 if (connection
->agreed_pro_version
< 91)
3068 device
->p_uuid
[UI_BITMAP
] = device
->p_uuid
[UI_HISTORY_START
];
3069 device
->p_uuid
[UI_HISTORY_START
] = device
->p_uuid
[UI_HISTORY_START
+ 1];
3071 drbd_info(device
, "Lost last syncUUID packet, corrected:\n");
3072 drbd_uuid_dump(device
, "peer", device
->p_uuid
, device
->p_uuid
[UI_SIZE
], device
->p_uuid
[UI_FLAGS
]);
3079 self
= device
->ldev
->md
.uuid
[UI_CURRENT
] & ~((u64
)1);
3080 for (i
= UI_HISTORY_START
; i
<= UI_HISTORY_END
; i
++) {
3081 peer
= device
->p_uuid
[i
] & ~((u64
)1);
3087 self
= device
->ldev
->md
.uuid
[UI_BITMAP
] & ~((u64
)1);
3088 peer
= device
->p_uuid
[UI_CURRENT
] & ~((u64
)1);
3093 self
= device
->ldev
->md
.uuid
[UI_HISTORY_START
] & ~((u64
)1);
3095 if (connection
->agreed_pro_version
< 96 ?
3096 (device
->ldev
->md
.uuid
[UI_HISTORY_START
+ 1] & ~((u64
)1)) ==
3097 (device
->p_uuid
[UI_HISTORY_START
] & ~((u64
)1)) :
3098 self
+ UUID_NEW_BM_OFFSET
== (device
->ldev
->md
.uuid
[UI_BITMAP
] & ~((u64
)1))) {
3099 /* The last P_SYNC_UUID did not get though. Undo the last start of
3100 resync as sync source modifications of our UUIDs. */
3102 if (connection
->agreed_pro_version
< 91)
3105 __drbd_uuid_set(device
, UI_BITMAP
, device
->ldev
->md
.uuid
[UI_HISTORY_START
]);
3106 __drbd_uuid_set(device
, UI_HISTORY_START
, device
->ldev
->md
.uuid
[UI_HISTORY_START
+ 1]);
3108 drbd_info(device
, "Last syncUUID did not get through, corrected:\n");
3109 drbd_uuid_dump(device
, "self", device
->ldev
->md
.uuid
,
3110 device
->state
.disk
>= D_NEGOTIATING
? drbd_bm_total_weight(device
) : 0, 0);
3118 peer
= device
->p_uuid
[UI_CURRENT
] & ~((u64
)1);
3119 for (i
= UI_HISTORY_START
; i
<= UI_HISTORY_END
; i
++) {
3120 self
= device
->ldev
->md
.uuid
[i
] & ~((u64
)1);
3126 self
= device
->ldev
->md
.uuid
[UI_BITMAP
] & ~((u64
)1);
3127 peer
= device
->p_uuid
[UI_BITMAP
] & ~((u64
)1);
3128 if (self
== peer
&& self
!= ((u64
)0))
3132 for (i
= UI_HISTORY_START
; i
<= UI_HISTORY_END
; i
++) {
3133 self
= device
->ldev
->md
.uuid
[i
] & ~((u64
)1);
3134 for (j
= UI_HISTORY_START
; j
<= UI_HISTORY_END
; j
++) {
3135 peer
= device
->p_uuid
[j
] & ~((u64
)1);
3144 /* drbd_sync_handshake() returns the new conn state on success, or
3145 CONN_MASK (-1) on failure.
3147 static enum drbd_conns
drbd_sync_handshake(struct drbd_peer_device
*peer_device
,
3148 enum drbd_role peer_role
,
3149 enum drbd_disk_state peer_disk
) __must_hold(local
)
3151 struct drbd_device
*device
= peer_device
->device
;
3152 enum drbd_conns rv
= C_MASK
;
3153 enum drbd_disk_state mydisk
;
3154 struct net_conf
*nc
;
3155 int hg
, rule_nr
, rr_conflict
, tentative
;
3157 mydisk
= device
->state
.disk
;
3158 if (mydisk
== D_NEGOTIATING
)
3159 mydisk
= device
->new_state_tmp
.disk
;
3161 drbd_info(device
, "drbd_sync_handshake:\n");
3163 spin_lock_irq(&device
->ldev
->md
.uuid_lock
);
3164 drbd_uuid_dump(device
, "self", device
->ldev
->md
.uuid
, device
->comm_bm_set
, 0);
3165 drbd_uuid_dump(device
, "peer", device
->p_uuid
,
3166 device
->p_uuid
[UI_SIZE
], device
->p_uuid
[UI_FLAGS
]);
3168 hg
= drbd_uuid_compare(device
, &rule_nr
);
3169 spin_unlock_irq(&device
->ldev
->md
.uuid_lock
);
3171 drbd_info(device
, "uuid_compare()=%d by rule %d\n", hg
, rule_nr
);
3174 drbd_alert(device
, "Unrelated data, aborting!\n");
3178 drbd_alert(device
, "To resolve this both sides have to support at least protocol %d\n", -hg
- 1000);
3182 if ((mydisk
== D_INCONSISTENT
&& peer_disk
> D_INCONSISTENT
) ||
3183 (peer_disk
== D_INCONSISTENT
&& mydisk
> D_INCONSISTENT
)) {
3184 int f
= (hg
== -100) || abs(hg
) == 2;
3185 hg
= mydisk
> D_INCONSISTENT
? 1 : -1;
3188 drbd_info(device
, "Becoming sync %s due to disk states.\n",
3189 hg
> 0 ? "source" : "target");
3193 drbd_khelper(device
, "initial-split-brain");
3196 nc
= rcu_dereference(peer_device
->connection
->net_conf
);
3198 if (hg
== 100 || (hg
== -100 && nc
->always_asbp
)) {
3199 int pcount
= (device
->state
.role
== R_PRIMARY
)
3200 + (peer_role
== R_PRIMARY
);
3201 int forced
= (hg
== -100);
3205 hg
= drbd_asb_recover_0p(peer_device
);
3208 hg
= drbd_asb_recover_1p(peer_device
);
3211 hg
= drbd_asb_recover_2p(peer_device
);
3214 if (abs(hg
) < 100) {
3215 drbd_warn(device
, "Split-Brain detected, %d primaries, "
3216 "automatically solved. Sync from %s node\n",
3217 pcount
, (hg
< 0) ? "peer" : "this");
3219 drbd_warn(device
, "Doing a full sync, since"
3220 " UUIDs where ambiguous.\n");
3227 if (test_bit(DISCARD_MY_DATA
, &device
->flags
) && !(device
->p_uuid
[UI_FLAGS
]&1))
3229 if (!test_bit(DISCARD_MY_DATA
, &device
->flags
) && (device
->p_uuid
[UI_FLAGS
]&1))
3233 drbd_warn(device
, "Split-Brain detected, manually solved. "
3234 "Sync from %s node\n",
3235 (hg
< 0) ? "peer" : "this");
3237 rr_conflict
= nc
->rr_conflict
;
3238 tentative
= nc
->tentative
;
3242 /* FIXME this log message is not correct if we end up here
3243 * after an attempted attach on a diskless node.
3244 * We just refuse to attach -- well, we drop the "connection"
3245 * to that disk, in a way... */
3246 drbd_alert(device
, "Split-Brain detected but unresolved, dropping connection!\n");
3247 drbd_khelper(device
, "split-brain");
3251 if (hg
> 0 && mydisk
<= D_INCONSISTENT
) {
3252 drbd_err(device
, "I shall become SyncSource, but I am inconsistent!\n");
3256 if (hg
< 0 && /* by intention we do not use mydisk here. */
3257 device
->state
.role
== R_PRIMARY
&& device
->state
.disk
>= D_CONSISTENT
) {
3258 switch (rr_conflict
) {
3259 case ASB_CALL_HELPER
:
3260 drbd_khelper(device
, "pri-lost");
3262 case ASB_DISCONNECT
:
3263 drbd_err(device
, "I shall become SyncTarget, but I am primary!\n");
3266 drbd_warn(device
, "Becoming SyncTarget, violating the stable-data"
3271 if (tentative
|| test_bit(CONN_DRY_RUN
, &peer_device
->connection
->flags
)) {
3273 drbd_info(device
, "dry-run connect: No resync, would become Connected immediately.\n");
3275 drbd_info(device
, "dry-run connect: Would become %s, doing a %s resync.",
3276 drbd_conn_str(hg
> 0 ? C_SYNC_SOURCE
: C_SYNC_TARGET
),
3277 abs(hg
) >= 2 ? "full" : "bit-map based");
3282 drbd_info(device
, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3283 if (drbd_bitmap_io(device
, &drbd_bmio_set_n_write
, "set_n_write from sync_handshake",
3284 BM_LOCKED_SET_ALLOWED
))
3288 if (hg
> 0) { /* become sync source. */
3290 } else if (hg
< 0) { /* become sync target */
3294 if (drbd_bm_total_weight(device
)) {
3295 drbd_info(device
, "No resync, but %lu bits in bitmap!\n",
3296 drbd_bm_total_weight(device
));
3303 static enum drbd_after_sb_p
convert_after_sb(enum drbd_after_sb_p peer
)
3305 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3306 if (peer
== ASB_DISCARD_REMOTE
)
3307 return ASB_DISCARD_LOCAL
;
3309 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3310 if (peer
== ASB_DISCARD_LOCAL
)
3311 return ASB_DISCARD_REMOTE
;
3313 /* everything else is valid if they are equal on both sides. */
3317 static int receive_protocol(struct drbd_connection
*connection
, struct packet_info
*pi
)
3319 struct p_protocol
*p
= pi
->data
;
3320 enum drbd_after_sb_p p_after_sb_0p
, p_after_sb_1p
, p_after_sb_2p
;
3321 int p_proto
, p_discard_my_data
, p_two_primaries
, cf
;
3322 struct net_conf
*nc
, *old_net_conf
, *new_net_conf
= NULL
;
3323 char integrity_alg
[SHARED_SECRET_MAX
] = "";
3324 struct crypto_ahash
*peer_integrity_tfm
= NULL
;
3325 void *int_dig_in
= NULL
, *int_dig_vv
= NULL
;
3327 p_proto
= be32_to_cpu(p
->protocol
);
3328 p_after_sb_0p
= be32_to_cpu(p
->after_sb_0p
);
3329 p_after_sb_1p
= be32_to_cpu(p
->after_sb_1p
);
3330 p_after_sb_2p
= be32_to_cpu(p
->after_sb_2p
);
3331 p_two_primaries
= be32_to_cpu(p
->two_primaries
);
3332 cf
= be32_to_cpu(p
->conn_flags
);
3333 p_discard_my_data
= cf
& CF_DISCARD_MY_DATA
;
3335 if (connection
->agreed_pro_version
>= 87) {
3338 if (pi
->size
> sizeof(integrity_alg
))
3340 err
= drbd_recv_all(connection
, integrity_alg
, pi
->size
);
3343 integrity_alg
[SHARED_SECRET_MAX
- 1] = 0;
3346 if (pi
->cmd
!= P_PROTOCOL_UPDATE
) {
3347 clear_bit(CONN_DRY_RUN
, &connection
->flags
);
3349 if (cf
& CF_DRY_RUN
)
3350 set_bit(CONN_DRY_RUN
, &connection
->flags
);
3353 nc
= rcu_dereference(connection
->net_conf
);
3355 if (p_proto
!= nc
->wire_protocol
) {
3356 drbd_err(connection
, "incompatible %s settings\n", "protocol");
3357 goto disconnect_rcu_unlock
;
3360 if (convert_after_sb(p_after_sb_0p
) != nc
->after_sb_0p
) {
3361 drbd_err(connection
, "incompatible %s settings\n", "after-sb-0pri");
3362 goto disconnect_rcu_unlock
;
3365 if (convert_after_sb(p_after_sb_1p
) != nc
->after_sb_1p
) {
3366 drbd_err(connection
, "incompatible %s settings\n", "after-sb-1pri");
3367 goto disconnect_rcu_unlock
;
3370 if (convert_after_sb(p_after_sb_2p
) != nc
->after_sb_2p
) {
3371 drbd_err(connection
, "incompatible %s settings\n", "after-sb-2pri");
3372 goto disconnect_rcu_unlock
;
3375 if (p_discard_my_data
&& nc
->discard_my_data
) {
3376 drbd_err(connection
, "incompatible %s settings\n", "discard-my-data");
3377 goto disconnect_rcu_unlock
;
3380 if (p_two_primaries
!= nc
->two_primaries
) {
3381 drbd_err(connection
, "incompatible %s settings\n", "allow-two-primaries");
3382 goto disconnect_rcu_unlock
;
3385 if (strcmp(integrity_alg
, nc
->integrity_alg
)) {
3386 drbd_err(connection
, "incompatible %s settings\n", "data-integrity-alg");
3387 goto disconnect_rcu_unlock
;
3393 if (integrity_alg
[0]) {
3397 * We can only change the peer data integrity algorithm
3398 * here. Changing our own data integrity algorithm
3399 * requires that we send a P_PROTOCOL_UPDATE packet at
3400 * the same time; otherwise, the peer has no way to
3401 * tell between which packets the algorithm should
3405 peer_integrity_tfm
= crypto_alloc_ahash(integrity_alg
, 0, CRYPTO_ALG_ASYNC
);
3406 if (!peer_integrity_tfm
) {
3407 drbd_err(connection
, "peer data-integrity-alg %s not supported\n",
3412 hash_size
= crypto_ahash_digestsize(peer_integrity_tfm
);
3413 int_dig_in
= kmalloc(hash_size
, GFP_KERNEL
);
3414 int_dig_vv
= kmalloc(hash_size
, GFP_KERNEL
);
3415 if (!(int_dig_in
&& int_dig_vv
)) {
3416 drbd_err(connection
, "Allocation of buffers for data integrity checking failed\n");
3421 new_net_conf
= kmalloc(sizeof(struct net_conf
), GFP_KERNEL
);
3422 if (!new_net_conf
) {
3423 drbd_err(connection
, "Allocation of new net_conf failed\n");
3427 mutex_lock(&connection
->data
.mutex
);
3428 mutex_lock(&connection
->resource
->conf_update
);
3429 old_net_conf
= connection
->net_conf
;
3430 *new_net_conf
= *old_net_conf
;
3432 new_net_conf
->wire_protocol
= p_proto
;
3433 new_net_conf
->after_sb_0p
= convert_after_sb(p_after_sb_0p
);
3434 new_net_conf
->after_sb_1p
= convert_after_sb(p_after_sb_1p
);
3435 new_net_conf
->after_sb_2p
= convert_after_sb(p_after_sb_2p
);
3436 new_net_conf
->two_primaries
= p_two_primaries
;
3438 rcu_assign_pointer(connection
->net_conf
, new_net_conf
);
3439 mutex_unlock(&connection
->resource
->conf_update
);
3440 mutex_unlock(&connection
->data
.mutex
);
3442 crypto_free_ahash(connection
->peer_integrity_tfm
);
3443 kfree(connection
->int_dig_in
);
3444 kfree(connection
->int_dig_vv
);
3445 connection
->peer_integrity_tfm
= peer_integrity_tfm
;
3446 connection
->int_dig_in
= int_dig_in
;
3447 connection
->int_dig_vv
= int_dig_vv
;
3449 if (strcmp(old_net_conf
->integrity_alg
, integrity_alg
))
3450 drbd_info(connection
, "peer data-integrity-alg: %s\n",
3451 integrity_alg
[0] ? integrity_alg
: "(none)");
3454 kfree(old_net_conf
);
3457 disconnect_rcu_unlock
:
3460 crypto_free_ahash(peer_integrity_tfm
);
3463 conn_request_state(connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
3468 * input: alg name, feature name
3469 * return: NULL (alg name was "")
3470 * ERR_PTR(error) if something goes wrong
3471 * or the crypto hash ptr, if it worked out ok. */
3472 static struct crypto_ahash
*drbd_crypto_alloc_digest_safe(const struct drbd_device
*device
,
3473 const char *alg
, const char *name
)
3475 struct crypto_ahash
*tfm
;
3480 tfm
= crypto_alloc_ahash(alg
, 0, CRYPTO_ALG_ASYNC
);
3482 drbd_err(device
, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3483 alg
, name
, PTR_ERR(tfm
));
3489 static int ignore_remaining_packet(struct drbd_connection
*connection
, struct packet_info
*pi
)
3491 void *buffer
= connection
->data
.rbuf
;
3492 int size
= pi
->size
;
3495 int s
= min_t(int, size
, DRBD_SOCKET_BUFFER_SIZE
);
3496 s
= drbd_recv(connection
, buffer
, s
);
3510 * config_unknown_volume - device configuration command for unknown volume
3512 * When a device is added to an existing connection, the node on which the
3513 * device is added first will send configuration commands to its peer but the
3514 * peer will not know about the device yet. It will warn and ignore these
3515 * commands. Once the device is added on the second node, the second node will
3516 * send the same device configuration commands, but in the other direction.
3518 * (We can also end up here if drbd is misconfigured.)
3520 static int config_unknown_volume(struct drbd_connection
*connection
, struct packet_info
*pi
)
3522 drbd_warn(connection
, "%s packet received for volume %u, which is not configured locally\n",
3523 cmdname(pi
->cmd
), pi
->vnr
);
3524 return ignore_remaining_packet(connection
, pi
);
3527 static int receive_SyncParam(struct drbd_connection
*connection
, struct packet_info
*pi
)
3529 struct drbd_peer_device
*peer_device
;
3530 struct drbd_device
*device
;
3531 struct p_rs_param_95
*p
;
3532 unsigned int header_size
, data_size
, exp_max_sz
;
3533 struct crypto_ahash
*verify_tfm
= NULL
;
3534 struct crypto_ahash
*csums_tfm
= NULL
;
3535 struct net_conf
*old_net_conf
, *new_net_conf
= NULL
;
3536 struct disk_conf
*old_disk_conf
= NULL
, *new_disk_conf
= NULL
;
3537 const int apv
= connection
->agreed_pro_version
;
3538 struct fifo_buffer
*old_plan
= NULL
, *new_plan
= NULL
;
3542 peer_device
= conn_peer_device(connection
, pi
->vnr
);
3544 return config_unknown_volume(connection
, pi
);
3545 device
= peer_device
->device
;
3547 exp_max_sz
= apv
<= 87 ? sizeof(struct p_rs_param
)
3548 : apv
== 88 ? sizeof(struct p_rs_param
)
3550 : apv
<= 94 ? sizeof(struct p_rs_param_89
)
3551 : /* apv >= 95 */ sizeof(struct p_rs_param_95
);
3553 if (pi
->size
> exp_max_sz
) {
3554 drbd_err(device
, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3555 pi
->size
, exp_max_sz
);
3560 header_size
= sizeof(struct p_rs_param
);
3561 data_size
= pi
->size
- header_size
;
3562 } else if (apv
<= 94) {
3563 header_size
= sizeof(struct p_rs_param_89
);
3564 data_size
= pi
->size
- header_size
;
3565 D_ASSERT(device
, data_size
== 0);
3567 header_size
= sizeof(struct p_rs_param_95
);
3568 data_size
= pi
->size
- header_size
;
3569 D_ASSERT(device
, data_size
== 0);
3572 /* initialize verify_alg and csums_alg */
3574 memset(p
->verify_alg
, 0, 2 * SHARED_SECRET_MAX
);
3576 err
= drbd_recv_all(peer_device
->connection
, p
, header_size
);
3580 mutex_lock(&connection
->resource
->conf_update
);
3581 old_net_conf
= peer_device
->connection
->net_conf
;
3582 if (get_ldev(device
)) {
3583 new_disk_conf
= kzalloc(sizeof(struct disk_conf
), GFP_KERNEL
);
3584 if (!new_disk_conf
) {
3586 mutex_unlock(&connection
->resource
->conf_update
);
3587 drbd_err(device
, "Allocation of new disk_conf failed\n");
3591 old_disk_conf
= device
->ldev
->disk_conf
;
3592 *new_disk_conf
= *old_disk_conf
;
3594 new_disk_conf
->resync_rate
= be32_to_cpu(p
->resync_rate
);
3599 if (data_size
> SHARED_SECRET_MAX
|| data_size
== 0) {
3600 drbd_err(device
, "verify-alg of wrong size, "
3601 "peer wants %u, accepting only up to %u byte\n",
3602 data_size
, SHARED_SECRET_MAX
);
3607 err
= drbd_recv_all(peer_device
->connection
, p
->verify_alg
, data_size
);
3610 /* we expect NUL terminated string */
3611 /* but just in case someone tries to be evil */
3612 D_ASSERT(device
, p
->verify_alg
[data_size
-1] == 0);
3613 p
->verify_alg
[data_size
-1] = 0;
3615 } else /* apv >= 89 */ {
3616 /* we still expect NUL terminated strings */
3617 /* but just in case someone tries to be evil */
3618 D_ASSERT(device
, p
->verify_alg
[SHARED_SECRET_MAX
-1] == 0);
3619 D_ASSERT(device
, p
->csums_alg
[SHARED_SECRET_MAX
-1] == 0);
3620 p
->verify_alg
[SHARED_SECRET_MAX
-1] = 0;
3621 p
->csums_alg
[SHARED_SECRET_MAX
-1] = 0;
3624 if (strcmp(old_net_conf
->verify_alg
, p
->verify_alg
)) {
3625 if (device
->state
.conn
== C_WF_REPORT_PARAMS
) {
3626 drbd_err(device
, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3627 old_net_conf
->verify_alg
, p
->verify_alg
);
3630 verify_tfm
= drbd_crypto_alloc_digest_safe(device
,
3631 p
->verify_alg
, "verify-alg");
3632 if (IS_ERR(verify_tfm
)) {
3638 if (apv
>= 89 && strcmp(old_net_conf
->csums_alg
, p
->csums_alg
)) {
3639 if (device
->state
.conn
== C_WF_REPORT_PARAMS
) {
3640 drbd_err(device
, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3641 old_net_conf
->csums_alg
, p
->csums_alg
);
3644 csums_tfm
= drbd_crypto_alloc_digest_safe(device
,
3645 p
->csums_alg
, "csums-alg");
3646 if (IS_ERR(csums_tfm
)) {
3652 if (apv
> 94 && new_disk_conf
) {
3653 new_disk_conf
->c_plan_ahead
= be32_to_cpu(p
->c_plan_ahead
);
3654 new_disk_conf
->c_delay_target
= be32_to_cpu(p
->c_delay_target
);
3655 new_disk_conf
->c_fill_target
= be32_to_cpu(p
->c_fill_target
);
3656 new_disk_conf
->c_max_rate
= be32_to_cpu(p
->c_max_rate
);
3658 fifo_size
= (new_disk_conf
->c_plan_ahead
* 10 * SLEEP_TIME
) / HZ
;
3659 if (fifo_size
!= device
->rs_plan_s
->size
) {
3660 new_plan
= fifo_alloc(fifo_size
);
3662 drbd_err(device
, "kmalloc of fifo_buffer failed");
3669 if (verify_tfm
|| csums_tfm
) {
3670 new_net_conf
= kzalloc(sizeof(struct net_conf
), GFP_KERNEL
);
3671 if (!new_net_conf
) {
3672 drbd_err(device
, "Allocation of new net_conf failed\n");
3676 *new_net_conf
= *old_net_conf
;
3679 strcpy(new_net_conf
->verify_alg
, p
->verify_alg
);
3680 new_net_conf
->verify_alg_len
= strlen(p
->verify_alg
) + 1;
3681 crypto_free_ahash(peer_device
->connection
->verify_tfm
);
3682 peer_device
->connection
->verify_tfm
= verify_tfm
;
3683 drbd_info(device
, "using verify-alg: \"%s\"\n", p
->verify_alg
);
3686 strcpy(new_net_conf
->csums_alg
, p
->csums_alg
);
3687 new_net_conf
->csums_alg_len
= strlen(p
->csums_alg
) + 1;
3688 crypto_free_ahash(peer_device
->connection
->csums_tfm
);
3689 peer_device
->connection
->csums_tfm
= csums_tfm
;
3690 drbd_info(device
, "using csums-alg: \"%s\"\n", p
->csums_alg
);
3692 rcu_assign_pointer(connection
->net_conf
, new_net_conf
);
3696 if (new_disk_conf
) {
3697 rcu_assign_pointer(device
->ldev
->disk_conf
, new_disk_conf
);
3702 old_plan
= device
->rs_plan_s
;
3703 rcu_assign_pointer(device
->rs_plan_s
, new_plan
);
3706 mutex_unlock(&connection
->resource
->conf_update
);
3709 kfree(old_net_conf
);
3710 kfree(old_disk_conf
);
3716 if (new_disk_conf
) {
3718 kfree(new_disk_conf
);
3720 mutex_unlock(&connection
->resource
->conf_update
);
3725 if (new_disk_conf
) {
3727 kfree(new_disk_conf
);
3729 mutex_unlock(&connection
->resource
->conf_update
);
3730 /* just for completeness: actually not needed,
3731 * as this is not reached if csums_tfm was ok. */
3732 crypto_free_ahash(csums_tfm
);
3733 /* but free the verify_tfm again, if csums_tfm did not work out */
3734 crypto_free_ahash(verify_tfm
);
3735 conn_request_state(peer_device
->connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
3739 /* warn if the arguments differ by more than 12.5% */
3740 static void warn_if_differ_considerably(struct drbd_device
*device
,
3741 const char *s
, sector_t a
, sector_t b
)
3744 if (a
== 0 || b
== 0)
3746 d
= (a
> b
) ? (a
- b
) : (b
- a
);
3747 if (d
> (a
>>3) || d
> (b
>>3))
3748 drbd_warn(device
, "Considerable difference in %s: %llus vs. %llus\n", s
,
3749 (unsigned long long)a
, (unsigned long long)b
);
3752 static int receive_sizes(struct drbd_connection
*connection
, struct packet_info
*pi
)
3754 struct drbd_peer_device
*peer_device
;
3755 struct drbd_device
*device
;
3756 struct p_sizes
*p
= pi
->data
;
3757 enum determine_dev_size dd
= DS_UNCHANGED
;
3758 sector_t p_size
, p_usize
, p_csize
, my_usize
;
3759 int ldsc
= 0; /* local disk size changed */
3760 enum dds_flags ddsf
;
3762 peer_device
= conn_peer_device(connection
, pi
->vnr
);
3764 return config_unknown_volume(connection
, pi
);
3765 device
= peer_device
->device
;
3767 p_size
= be64_to_cpu(p
->d_size
);
3768 p_usize
= be64_to_cpu(p
->u_size
);
3769 p_csize
= be64_to_cpu(p
->c_size
);
3771 /* just store the peer's disk size for now.
3772 * we still need to figure out whether we accept that. */
3773 device
->p_size
= p_size
;
3775 if (get_ldev(device
)) {
3777 my_usize
= rcu_dereference(device
->ldev
->disk_conf
)->disk_size
;
3780 warn_if_differ_considerably(device
, "lower level device sizes",
3781 p_size
, drbd_get_max_capacity(device
->ldev
));
3782 warn_if_differ_considerably(device
, "user requested size",
3785 /* if this is the first connect, or an otherwise expected
3786 * param exchange, choose the minimum */
3787 if (device
->state
.conn
== C_WF_REPORT_PARAMS
)
3788 p_usize
= min_not_zero(my_usize
, p_usize
);
3790 /* Never shrink a device with usable data during connect.
3791 But allow online shrinking if we are connected. */
3792 if (drbd_new_dev_size(device
, device
->ldev
, p_usize
, 0) <
3793 drbd_get_capacity(device
->this_bdev
) &&
3794 device
->state
.disk
>= D_OUTDATED
&&
3795 device
->state
.conn
< C_CONNECTED
) {
3796 drbd_err(device
, "The peer's disk size is too small!\n");
3797 conn_request_state(peer_device
->connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
3802 if (my_usize
!= p_usize
) {
3803 struct disk_conf
*old_disk_conf
, *new_disk_conf
= NULL
;
3805 new_disk_conf
= kzalloc(sizeof(struct disk_conf
), GFP_KERNEL
);
3806 if (!new_disk_conf
) {
3807 drbd_err(device
, "Allocation of new disk_conf failed\n");
3812 mutex_lock(&connection
->resource
->conf_update
);
3813 old_disk_conf
= device
->ldev
->disk_conf
;
3814 *new_disk_conf
= *old_disk_conf
;
3815 new_disk_conf
->disk_size
= p_usize
;
3817 rcu_assign_pointer(device
->ldev
->disk_conf
, new_disk_conf
);
3818 mutex_unlock(&connection
->resource
->conf_update
);
3820 kfree(old_disk_conf
);
3822 drbd_info(device
, "Peer sets u_size to %lu sectors\n",
3823 (unsigned long)my_usize
);
3829 device
->peer_max_bio_size
= be32_to_cpu(p
->max_bio_size
);
3830 /* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
3831 In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3832 drbd_reconsider_max_bio_size(), we can be sure that after
3833 drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3835 ddsf
= be16_to_cpu(p
->dds_flags
);
3836 if (get_ldev(device
)) {
3837 drbd_reconsider_max_bio_size(device
, device
->ldev
);
3838 dd
= drbd_determine_dev_size(device
, ddsf
, NULL
);
3842 drbd_md_sync(device
);
3845 * I am diskless, need to accept the peer's *current* size.
3846 * I must NOT accept the peers backing disk size,
3847 * it may have been larger than mine all along...
3849 * At this point, the peer knows more about my disk, or at
3850 * least about what we last agreed upon, than myself.
3851 * So if his c_size is less than his d_size, the most likely
3852 * reason is that *my* d_size was smaller last time we checked.
3854 * However, if he sends a zero current size,
3855 * take his (user-capped or) backing disk size anyways.
3857 drbd_reconsider_max_bio_size(device
, NULL
);
3858 drbd_set_my_capacity(device
, p_csize
?: p_usize
?: p_size
);
3861 if (get_ldev(device
)) {
3862 if (device
->ldev
->known_size
!= drbd_get_capacity(device
->ldev
->backing_bdev
)) {
3863 device
->ldev
->known_size
= drbd_get_capacity(device
->ldev
->backing_bdev
);
3870 if (device
->state
.conn
> C_WF_REPORT_PARAMS
) {
3871 if (be64_to_cpu(p
->c_size
) !=
3872 drbd_get_capacity(device
->this_bdev
) || ldsc
) {
3873 /* we have different sizes, probably peer
3874 * needs to know my new size... */
3875 drbd_send_sizes(peer_device
, 0, ddsf
);
3877 if (test_and_clear_bit(RESIZE_PENDING
, &device
->flags
) ||
3878 (dd
== DS_GREW
&& device
->state
.conn
== C_CONNECTED
)) {
3879 if (device
->state
.pdsk
>= D_INCONSISTENT
&&
3880 device
->state
.disk
>= D_INCONSISTENT
) {
3881 if (ddsf
& DDSF_NO_RESYNC
)
3882 drbd_info(device
, "Resync of new storage suppressed with --assume-clean\n");
3884 resync_after_online_grow(device
);
3886 set_bit(RESYNC_AFTER_NEG
, &device
->flags
);
3893 static int receive_uuids(struct drbd_connection
*connection
, struct packet_info
*pi
)
3895 struct drbd_peer_device
*peer_device
;
3896 struct drbd_device
*device
;
3897 struct p_uuids
*p
= pi
->data
;
3899 int i
, updated_uuids
= 0;
3901 peer_device
= conn_peer_device(connection
, pi
->vnr
);
3903 return config_unknown_volume(connection
, pi
);
3904 device
= peer_device
->device
;
3906 p_uuid
= kmalloc(sizeof(u64
)*UI_EXTENDED_SIZE
, GFP_NOIO
);
3908 drbd_err(device
, "kmalloc of p_uuid failed\n");
3912 for (i
= UI_CURRENT
; i
< UI_EXTENDED_SIZE
; i
++)
3913 p_uuid
[i
] = be64_to_cpu(p
->uuid
[i
]);
3915 kfree(device
->p_uuid
);
3916 device
->p_uuid
= p_uuid
;
3918 if (device
->state
.conn
< C_CONNECTED
&&
3919 device
->state
.disk
< D_INCONSISTENT
&&
3920 device
->state
.role
== R_PRIMARY
&&
3921 (device
->ed_uuid
& ~((u64
)1)) != (p_uuid
[UI_CURRENT
] & ~((u64
)1))) {
3922 drbd_err(device
, "Can only connect to data with current UUID=%016llX\n",
3923 (unsigned long long)device
->ed_uuid
);
3924 conn_request_state(peer_device
->connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
3928 if (get_ldev(device
)) {
3929 int skip_initial_sync
=
3930 device
->state
.conn
== C_CONNECTED
&&
3931 peer_device
->connection
->agreed_pro_version
>= 90 &&
3932 device
->ldev
->md
.uuid
[UI_CURRENT
] == UUID_JUST_CREATED
&&
3933 (p_uuid
[UI_FLAGS
] & 8);
3934 if (skip_initial_sync
) {
3935 drbd_info(device
, "Accepted new current UUID, preparing to skip initial sync\n");
3936 drbd_bitmap_io(device
, &drbd_bmio_clear_n_write
,
3937 "clear_n_write from receive_uuids",
3938 BM_LOCKED_TEST_ALLOWED
);
3939 _drbd_uuid_set(device
, UI_CURRENT
, p_uuid
[UI_CURRENT
]);
3940 _drbd_uuid_set(device
, UI_BITMAP
, 0);
3941 _drbd_set_state(_NS2(device
, disk
, D_UP_TO_DATE
, pdsk
, D_UP_TO_DATE
),
3943 drbd_md_sync(device
);
3947 } else if (device
->state
.disk
< D_INCONSISTENT
&&
3948 device
->state
.role
== R_PRIMARY
) {
3949 /* I am a diskless primary, the peer just created a new current UUID
3951 updated_uuids
= drbd_set_ed_uuid(device
, p_uuid
[UI_CURRENT
]);
3954 /* Before we test for the disk state, we should wait until an eventually
3955 ongoing cluster wide state change is finished. That is important if
3956 we are primary and are detaching from our disk. We need to see the
3957 new disk state... */
3958 mutex_lock(device
->state_mutex
);
3959 mutex_unlock(device
->state_mutex
);
3960 if (device
->state
.conn
>= C_CONNECTED
&& device
->state
.disk
< D_INCONSISTENT
)
3961 updated_uuids
|= drbd_set_ed_uuid(device
, p_uuid
[UI_CURRENT
]);
3964 drbd_print_uuids(device
, "receiver updated UUIDs to");
3970 * convert_state() - Converts the peer's view of the cluster state to our point of view
3971 * @ps: The state as seen by the peer.
3973 static union drbd_state
convert_state(union drbd_state ps
)
3975 union drbd_state ms
;
3977 static enum drbd_conns c_tab
[] = {
3978 [C_WF_REPORT_PARAMS
] = C_WF_REPORT_PARAMS
,
3979 [C_CONNECTED
] = C_CONNECTED
,
3981 [C_STARTING_SYNC_S
] = C_STARTING_SYNC_T
,
3982 [C_STARTING_SYNC_T
] = C_STARTING_SYNC_S
,
3983 [C_DISCONNECTING
] = C_TEAR_DOWN
, /* C_NETWORK_FAILURE, */
3984 [C_VERIFY_S
] = C_VERIFY_T
,
3990 ms
.conn
= c_tab
[ps
.conn
];
3995 ms
.peer_isp
= (ps
.aftr_isp
| ps
.user_isp
);
4000 static int receive_req_state(struct drbd_connection
*connection
, struct packet_info
*pi
)
4002 struct drbd_peer_device
*peer_device
;
4003 struct drbd_device
*device
;
4004 struct p_req_state
*p
= pi
->data
;
4005 union drbd_state mask
, val
;
4006 enum drbd_state_rv rv
;
4008 peer_device
= conn_peer_device(connection
, pi
->vnr
);
4011 device
= peer_device
->device
;
4013 mask
.i
= be32_to_cpu(p
->mask
);
4014 val
.i
= be32_to_cpu(p
->val
);
4016 if (test_bit(RESOLVE_CONFLICTS
, &peer_device
->connection
->flags
) &&
4017 mutex_is_locked(device
->state_mutex
)) {
4018 drbd_send_sr_reply(peer_device
, SS_CONCURRENT_ST_CHG
);
4022 mask
= convert_state(mask
);
4023 val
= convert_state(val
);
4025 rv
= drbd_change_state(device
, CS_VERBOSE
, mask
, val
);
4026 drbd_send_sr_reply(peer_device
, rv
);
4028 drbd_md_sync(device
);
4033 static int receive_req_conn_state(struct drbd_connection
*connection
, struct packet_info
*pi
)
4035 struct p_req_state
*p
= pi
->data
;
4036 union drbd_state mask
, val
;
4037 enum drbd_state_rv rv
;
4039 mask
.i
= be32_to_cpu(p
->mask
);
4040 val
.i
= be32_to_cpu(p
->val
);
4042 if (test_bit(RESOLVE_CONFLICTS
, &connection
->flags
) &&
4043 mutex_is_locked(&connection
->cstate_mutex
)) {
4044 conn_send_sr_reply(connection
, SS_CONCURRENT_ST_CHG
);
4048 mask
= convert_state(mask
);
4049 val
= convert_state(val
);
4051 rv
= conn_request_state(connection
, mask
, val
, CS_VERBOSE
| CS_LOCAL_ONLY
| CS_IGN_OUTD_FAIL
);
4052 conn_send_sr_reply(connection
, rv
);
4057 static int receive_state(struct drbd_connection
*connection
, struct packet_info
*pi
)
4059 struct drbd_peer_device
*peer_device
;
4060 struct drbd_device
*device
;
4061 struct p_state
*p
= pi
->data
;
4062 union drbd_state os
, ns
, peer_state
;
4063 enum drbd_disk_state real_peer_disk
;
4064 enum chg_state_flags cs_flags
;
4067 peer_device
= conn_peer_device(connection
, pi
->vnr
);
4069 return config_unknown_volume(connection
, pi
);
4070 device
= peer_device
->device
;
4072 peer_state
.i
= be32_to_cpu(p
->state
);
4074 real_peer_disk
= peer_state
.disk
;
4075 if (peer_state
.disk
== D_NEGOTIATING
) {
4076 real_peer_disk
= device
->p_uuid
[UI_FLAGS
] & 4 ? D_INCONSISTENT
: D_CONSISTENT
;
4077 drbd_info(device
, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk
));
4080 spin_lock_irq(&device
->resource
->req_lock
);
4082 os
= ns
= drbd_read_state(device
);
4083 spin_unlock_irq(&device
->resource
->req_lock
);
4085 /* If some other part of the code (ack_receiver thread, timeout)
4086 * already decided to close the connection again,
4087 * we must not "re-establish" it here. */
4088 if (os
.conn
<= C_TEAR_DOWN
)
4091 /* If this is the "end of sync" confirmation, usually the peer disk
4092 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4093 * set) resync started in PausedSyncT, or if the timing of pause-/
4094 * unpause-sync events has been "just right", the peer disk may
4095 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4097 if ((os
.pdsk
== D_INCONSISTENT
|| os
.pdsk
== D_CONSISTENT
) &&
4098 real_peer_disk
== D_UP_TO_DATE
&&
4099 os
.conn
> C_CONNECTED
&& os
.disk
== D_UP_TO_DATE
) {
4100 /* If we are (becoming) SyncSource, but peer is still in sync
4101 * preparation, ignore its uptodate-ness to avoid flapping, it
4102 * will change to inconsistent once the peer reaches active
4104 * It may have changed syncer-paused flags, however, so we
4105 * cannot ignore this completely. */
4106 if (peer_state
.conn
> C_CONNECTED
&&
4107 peer_state
.conn
< C_SYNC_SOURCE
)
4108 real_peer_disk
= D_INCONSISTENT
;
4110 /* if peer_state changes to connected at the same time,
4111 * it explicitly notifies us that it finished resync.
4112 * Maybe we should finish it up, too? */
4113 else if (os
.conn
>= C_SYNC_SOURCE
&&
4114 peer_state
.conn
== C_CONNECTED
) {
4115 if (drbd_bm_total_weight(device
) <= device
->rs_failed
)
4116 drbd_resync_finished(device
);
4121 /* explicit verify finished notification, stop sector reached. */
4122 if (os
.conn
== C_VERIFY_T
&& os
.disk
== D_UP_TO_DATE
&&
4123 peer_state
.conn
== C_CONNECTED
&& real_peer_disk
== D_UP_TO_DATE
) {
4124 ov_out_of_sync_print(device
);
4125 drbd_resync_finished(device
);
4129 /* peer says his disk is inconsistent, while we think it is uptodate,
4130 * and this happens while the peer still thinks we have a sync going on,
4131 * but we think we are already done with the sync.
4132 * We ignore this to avoid flapping pdsk.
4133 * This should not happen, if the peer is a recent version of drbd. */
4134 if (os
.pdsk
== D_UP_TO_DATE
&& real_peer_disk
== D_INCONSISTENT
&&
4135 os
.conn
== C_CONNECTED
&& peer_state
.conn
> C_SYNC_SOURCE
)
4136 real_peer_disk
= D_UP_TO_DATE
;
4138 if (ns
.conn
== C_WF_REPORT_PARAMS
)
4139 ns
.conn
= C_CONNECTED
;
4141 if (peer_state
.conn
== C_AHEAD
)
4144 if (device
->p_uuid
&& peer_state
.disk
>= D_NEGOTIATING
&&
4145 get_ldev_if_state(device
, D_NEGOTIATING
)) {
4146 int cr
; /* consider resync */
4148 /* if we established a new connection */
4149 cr
= (os
.conn
< C_CONNECTED
);
4150 /* if we had an established connection
4151 * and one of the nodes newly attaches a disk */
4152 cr
|= (os
.conn
== C_CONNECTED
&&
4153 (peer_state
.disk
== D_NEGOTIATING
||
4154 os
.disk
== D_NEGOTIATING
));
4155 /* if we have both been inconsistent, and the peer has been
4156 * forced to be UpToDate with --overwrite-data */
4157 cr
|= test_bit(CONSIDER_RESYNC
, &device
->flags
);
4158 /* if we had been plain connected, and the admin requested to
4159 * start a sync by "invalidate" or "invalidate-remote" */
4160 cr
|= (os
.conn
== C_CONNECTED
&&
4161 (peer_state
.conn
>= C_STARTING_SYNC_S
&&
4162 peer_state
.conn
<= C_WF_BITMAP_T
));
4165 ns
.conn
= drbd_sync_handshake(peer_device
, peer_state
.role
, real_peer_disk
);
4168 if (ns
.conn
== C_MASK
) {
4169 ns
.conn
= C_CONNECTED
;
4170 if (device
->state
.disk
== D_NEGOTIATING
) {
4171 drbd_force_state(device
, NS(disk
, D_FAILED
));
4172 } else if (peer_state
.disk
== D_NEGOTIATING
) {
4173 drbd_err(device
, "Disk attach process on the peer node was aborted.\n");
4174 peer_state
.disk
= D_DISKLESS
;
4175 real_peer_disk
= D_DISKLESS
;
4177 if (test_and_clear_bit(CONN_DRY_RUN
, &peer_device
->connection
->flags
))
4179 D_ASSERT(device
, os
.conn
== C_WF_REPORT_PARAMS
);
4180 conn_request_state(peer_device
->connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
4186 spin_lock_irq(&device
->resource
->req_lock
);
4187 if (os
.i
!= drbd_read_state(device
).i
)
4189 clear_bit(CONSIDER_RESYNC
, &device
->flags
);
4190 ns
.peer
= peer_state
.role
;
4191 ns
.pdsk
= real_peer_disk
;
4192 ns
.peer_isp
= (peer_state
.aftr_isp
| peer_state
.user_isp
);
4193 if ((ns
.conn
== C_CONNECTED
|| ns
.conn
== C_WF_BITMAP_S
) && ns
.disk
== D_NEGOTIATING
)
4194 ns
.disk
= device
->new_state_tmp
.disk
;
4195 cs_flags
= CS_VERBOSE
+ (os
.conn
< C_CONNECTED
&& ns
.conn
>= C_CONNECTED
? 0 : CS_HARD
);
4196 if (ns
.pdsk
== D_CONSISTENT
&& drbd_suspended(device
) && ns
.conn
== C_CONNECTED
&& os
.conn
< C_CONNECTED
&&
4197 test_bit(NEW_CUR_UUID
, &device
->flags
)) {
4198 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4199 for temporal network outages! */
4200 spin_unlock_irq(&device
->resource
->req_lock
);
4201 drbd_err(device
, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4202 tl_clear(peer_device
->connection
);
4203 drbd_uuid_new_current(device
);
4204 clear_bit(NEW_CUR_UUID
, &device
->flags
);
4205 conn_request_state(peer_device
->connection
, NS2(conn
, C_PROTOCOL_ERROR
, susp
, 0), CS_HARD
);
4208 rv
= _drbd_set_state(device
, ns
, cs_flags
, NULL
);
4209 ns
= drbd_read_state(device
);
4210 spin_unlock_irq(&device
->resource
->req_lock
);
4212 if (rv
< SS_SUCCESS
) {
4213 conn_request_state(peer_device
->connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
4217 if (os
.conn
> C_WF_REPORT_PARAMS
) {
4218 if (ns
.conn
> C_CONNECTED
&& peer_state
.conn
<= C_CONNECTED
&&
4219 peer_state
.disk
!= D_NEGOTIATING
) {
4220 /* we want resync, peer has not yet decided to sync... */
4221 /* Nowadays only used when forcing a node into primary role and
4222 setting its disk to UpToDate with that */
4223 drbd_send_uuids(peer_device
);
4224 drbd_send_current_state(peer_device
);
4228 clear_bit(DISCARD_MY_DATA
, &device
->flags
);
4230 drbd_md_sync(device
); /* update connected indicator, la_size_sect, ... */
4235 static int receive_sync_uuid(struct drbd_connection
*connection
, struct packet_info
*pi
)
4237 struct drbd_peer_device
*peer_device
;
4238 struct drbd_device
*device
;
4239 struct p_rs_uuid
*p
= pi
->data
;
4241 peer_device
= conn_peer_device(connection
, pi
->vnr
);
4244 device
= peer_device
->device
;
4246 wait_event(device
->misc_wait
,
4247 device
->state
.conn
== C_WF_SYNC_UUID
||
4248 device
->state
.conn
== C_BEHIND
||
4249 device
->state
.conn
< C_CONNECTED
||
4250 device
->state
.disk
< D_NEGOTIATING
);
4252 /* D_ASSERT(device, device->state.conn == C_WF_SYNC_UUID ); */
4254 /* Here the _drbd_uuid_ functions are right, current should
4255 _not_ be rotated into the history */
4256 if (get_ldev_if_state(device
, D_NEGOTIATING
)) {
4257 _drbd_uuid_set(device
, UI_CURRENT
, be64_to_cpu(p
->uuid
));
4258 _drbd_uuid_set(device
, UI_BITMAP
, 0UL);
4260 drbd_print_uuids(device
, "updated sync uuid");
4261 drbd_start_resync(device
, C_SYNC_TARGET
);
4265 drbd_err(device
, "Ignoring SyncUUID packet!\n");
4271 * receive_bitmap_plain
4273 * Return 0 when done, 1 when another iteration is needed, and a negative error
4274 * code upon failure.
4277 receive_bitmap_plain(struct drbd_peer_device
*peer_device
, unsigned int size
,
4278 unsigned long *p
, struct bm_xfer_ctx
*c
)
4280 unsigned int data_size
= DRBD_SOCKET_BUFFER_SIZE
-
4281 drbd_header_size(peer_device
->connection
);
4282 unsigned int num_words
= min_t(size_t, data_size
/ sizeof(*p
),
4283 c
->bm_words
- c
->word_offset
);
4284 unsigned int want
= num_words
* sizeof(*p
);
4288 drbd_err(peer_device
, "%s:want (%u) != size (%u)\n", __func__
, want
, size
);
4293 err
= drbd_recv_all(peer_device
->connection
, p
, want
);
4297 drbd_bm_merge_lel(peer_device
->device
, c
->word_offset
, num_words
, p
);
4299 c
->word_offset
+= num_words
;
4300 c
->bit_offset
= c
->word_offset
* BITS_PER_LONG
;
4301 if (c
->bit_offset
> c
->bm_bits
)
4302 c
->bit_offset
= c
->bm_bits
;
4307 static enum drbd_bitmap_code
dcbp_get_code(struct p_compressed_bm
*p
)
4309 return (enum drbd_bitmap_code
)(p
->encoding
& 0x0f);
4312 static int dcbp_get_start(struct p_compressed_bm
*p
)
4314 return (p
->encoding
& 0x80) != 0;
4317 static int dcbp_get_pad_bits(struct p_compressed_bm
*p
)
4319 return (p
->encoding
>> 4) & 0x7;
4325 * Return 0 when done, 1 when another iteration is needed, and a negative error
4326 * code upon failure.
4329 recv_bm_rle_bits(struct drbd_peer_device
*peer_device
,
4330 struct p_compressed_bm
*p
,
4331 struct bm_xfer_ctx
*c
,
4334 struct bitstream bs
;
4338 unsigned long s
= c
->bit_offset
;
4340 int toggle
= dcbp_get_start(p
);
4344 bitstream_init(&bs
, p
->code
, len
, dcbp_get_pad_bits(p
));
4346 bits
= bitstream_get_bits(&bs
, &look_ahead
, 64);
4350 for (have
= bits
; have
> 0; s
+= rl
, toggle
= !toggle
) {
4351 bits
= vli_decode_bits(&rl
, look_ahead
);
4357 if (e
>= c
->bm_bits
) {
4358 drbd_err(peer_device
, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e
);
4361 _drbd_bm_set_bits(peer_device
->device
, s
, e
);
4365 drbd_err(peer_device
, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4366 have
, bits
, look_ahead
,
4367 (unsigned int)(bs
.cur
.b
- p
->code
),
4368 (unsigned int)bs
.buf_len
);
4371 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4372 if (likely(bits
< 64))
4373 look_ahead
>>= bits
;
4378 bits
= bitstream_get_bits(&bs
, &tmp
, 64 - have
);
4381 look_ahead
|= tmp
<< have
;
4386 bm_xfer_ctx_bit_to_word_offset(c
);
4388 return (s
!= c
->bm_bits
);
4394 * Return 0 when done, 1 when another iteration is needed, and a negative error
4395 * code upon failure.
4398 decode_bitmap_c(struct drbd_peer_device
*peer_device
,
4399 struct p_compressed_bm
*p
,
4400 struct bm_xfer_ctx
*c
,
4403 if (dcbp_get_code(p
) == RLE_VLI_Bits
)
4404 return recv_bm_rle_bits(peer_device
, p
, c
, len
- sizeof(*p
));
4406 /* other variants had been implemented for evaluation,
4407 * but have been dropped as this one turned out to be "best"
4408 * during all our tests. */
4410 drbd_err(peer_device
, "receive_bitmap_c: unknown encoding %u\n", p
->encoding
);
4411 conn_request_state(peer_device
->connection
, NS(conn
, C_PROTOCOL_ERROR
), CS_HARD
);
4415 void INFO_bm_xfer_stats(struct drbd_device
*device
,
4416 const char *direction
, struct bm_xfer_ctx
*c
)
4418 /* what would it take to transfer it "plaintext" */
4419 unsigned int header_size
= drbd_header_size(first_peer_device(device
)->connection
);
4420 unsigned int data_size
= DRBD_SOCKET_BUFFER_SIZE
- header_size
;
4421 unsigned int plain
=
4422 header_size
* (DIV_ROUND_UP(c
->bm_words
, data_size
) + 1) +
4423 c
->bm_words
* sizeof(unsigned long);
4424 unsigned int total
= c
->bytes
[0] + c
->bytes
[1];
4427 /* total can not be zero. but just in case: */
4431 /* don't report if not compressed */
4435 /* total < plain. check for overflow, still */
4436 r
= (total
> UINT_MAX
/1000) ? (total
/ (plain
/1000))
4437 : (1000 * total
/ plain
);
4443 drbd_info(device
, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4444 "total %u; compression: %u.%u%%\n",
4446 c
->bytes
[1], c
->packets
[1],
4447 c
->bytes
[0], c
->packets
[0],
4448 total
, r
/10, r
% 10);
4451 /* Since we are processing the bitfield from lower addresses to higher,
4452 it does not matter if the process it in 32 bit chunks or 64 bit
4453 chunks as long as it is little endian. (Understand it as byte stream,
4454 beginning with the lowest byte...) If we would use big endian
4455 we would need to process it from the highest address to the lowest,
4456 in order to be agnostic to the 32 vs 64 bits issue.
4458 returns 0 on failure, 1 if we successfully received it. */
4459 static int receive_bitmap(struct drbd_connection
*connection
, struct packet_info
*pi
)
4461 struct drbd_peer_device
*peer_device
;
4462 struct drbd_device
*device
;
4463 struct bm_xfer_ctx c
;
4466 peer_device
= conn_peer_device(connection
, pi
->vnr
);
4469 device
= peer_device
->device
;
4471 drbd_bm_lock(device
, "receive bitmap", BM_LOCKED_SET_ALLOWED
);
4472 /* you are supposed to send additional out-of-sync information
4473 * if you actually set bits during this phase */
4475 c
= (struct bm_xfer_ctx
) {
4476 .bm_bits
= drbd_bm_bits(device
),
4477 .bm_words
= drbd_bm_words(device
),
4481 if (pi
->cmd
== P_BITMAP
)
4482 err
= receive_bitmap_plain(peer_device
, pi
->size
, pi
->data
, &c
);
4483 else if (pi
->cmd
== P_COMPRESSED_BITMAP
) {
4484 /* MAYBE: sanity check that we speak proto >= 90,
4485 * and the feature is enabled! */
4486 struct p_compressed_bm
*p
= pi
->data
;
4488 if (pi
->size
> DRBD_SOCKET_BUFFER_SIZE
- drbd_header_size(connection
)) {
4489 drbd_err(device
, "ReportCBitmap packet too large\n");
4493 if (pi
->size
<= sizeof(*p
)) {
4494 drbd_err(device
, "ReportCBitmap packet too small (l:%u)\n", pi
->size
);
4498 err
= drbd_recv_all(peer_device
->connection
, p
, pi
->size
);
4501 err
= decode_bitmap_c(peer_device
, p
, &c
, pi
->size
);
4503 drbd_warn(device
, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi
->cmd
);
4508 c
.packets
[pi
->cmd
== P_BITMAP
]++;
4509 c
.bytes
[pi
->cmd
== P_BITMAP
] += drbd_header_size(connection
) + pi
->size
;
4516 err
= drbd_recv_header(peer_device
->connection
, pi
);
4521 INFO_bm_xfer_stats(device
, "receive", &c
);
4523 if (device
->state
.conn
== C_WF_BITMAP_T
) {
4524 enum drbd_state_rv rv
;
4526 err
= drbd_send_bitmap(device
);
4529 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4530 rv
= _drbd_request_state(device
, NS(conn
, C_WF_SYNC_UUID
), CS_VERBOSE
);
4531 D_ASSERT(device
, rv
== SS_SUCCESS
);
4532 } else if (device
->state
.conn
!= C_WF_BITMAP_S
) {
4533 /* admin may have requested C_DISCONNECTING,
4534 * other threads may have noticed network errors */
4535 drbd_info(device
, "unexpected cstate (%s) in receive_bitmap\n",
4536 drbd_conn_str(device
->state
.conn
));
4541 drbd_bm_unlock(device
);
4542 if (!err
&& device
->state
.conn
== C_WF_BITMAP_S
)
4543 drbd_start_resync(device
, C_SYNC_SOURCE
);
4547 static int receive_skip(struct drbd_connection
*connection
, struct packet_info
*pi
)
4549 drbd_warn(connection
, "skipping unknown optional packet type %d, l: %d!\n",
4552 return ignore_remaining_packet(connection
, pi
);
4555 static int receive_UnplugRemote(struct drbd_connection
*connection
, struct packet_info
*pi
)
4557 /* Make sure we've acked all the TCP data associated
4558 * with the data requests being unplugged */
4559 drbd_tcp_quickack(connection
->data
.socket
);
4564 static int receive_out_of_sync(struct drbd_connection
*connection
, struct packet_info
*pi
)
4566 struct drbd_peer_device
*peer_device
;
4567 struct drbd_device
*device
;
4568 struct p_block_desc
*p
= pi
->data
;
4570 peer_device
= conn_peer_device(connection
, pi
->vnr
);
4573 device
= peer_device
->device
;
4575 switch (device
->state
.conn
) {
4576 case C_WF_SYNC_UUID
:
4581 drbd_err(device
, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4582 drbd_conn_str(device
->state
.conn
));
4585 drbd_set_out_of_sync(device
, be64_to_cpu(p
->sector
), be32_to_cpu(p
->blksize
));
4593 int (*fn
)(struct drbd_connection
*, struct packet_info
*);
4596 static struct data_cmd drbd_cmd_handler
[] = {
4597 [P_DATA
] = { 1, sizeof(struct p_data
), receive_Data
},
4598 [P_DATA_REPLY
] = { 1, sizeof(struct p_data
), receive_DataReply
},
4599 [P_RS_DATA_REPLY
] = { 1, sizeof(struct p_data
), receive_RSDataReply
} ,
4600 [P_BARRIER
] = { 0, sizeof(struct p_barrier
), receive_Barrier
} ,
4601 [P_BITMAP
] = { 1, 0, receive_bitmap
} ,
4602 [P_COMPRESSED_BITMAP
] = { 1, 0, receive_bitmap
} ,
4603 [P_UNPLUG_REMOTE
] = { 0, 0, receive_UnplugRemote
},
4604 [P_DATA_REQUEST
] = { 0, sizeof(struct p_block_req
), receive_DataRequest
},
4605 [P_RS_DATA_REQUEST
] = { 0, sizeof(struct p_block_req
), receive_DataRequest
},
4606 [P_SYNC_PARAM
] = { 1, 0, receive_SyncParam
},
4607 [P_SYNC_PARAM89
] = { 1, 0, receive_SyncParam
},
4608 [P_PROTOCOL
] = { 1, sizeof(struct p_protocol
), receive_protocol
},
4609 [P_UUIDS
] = { 0, sizeof(struct p_uuids
), receive_uuids
},
4610 [P_SIZES
] = { 0, sizeof(struct p_sizes
), receive_sizes
},
4611 [P_STATE
] = { 0, sizeof(struct p_state
), receive_state
},
4612 [P_STATE_CHG_REQ
] = { 0, sizeof(struct p_req_state
), receive_req_state
},
4613 [P_SYNC_UUID
] = { 0, sizeof(struct p_rs_uuid
), receive_sync_uuid
},
4614 [P_OV_REQUEST
] = { 0, sizeof(struct p_block_req
), receive_DataRequest
},
4615 [P_OV_REPLY
] = { 1, sizeof(struct p_block_req
), receive_DataRequest
},
4616 [P_CSUM_RS_REQUEST
] = { 1, sizeof(struct p_block_req
), receive_DataRequest
},
4617 [P_DELAY_PROBE
] = { 0, sizeof(struct p_delay_probe93
), receive_skip
},
4618 [P_OUT_OF_SYNC
] = { 0, sizeof(struct p_block_desc
), receive_out_of_sync
},
4619 [P_CONN_ST_CHG_REQ
] = { 0, sizeof(struct p_req_state
), receive_req_conn_state
},
4620 [P_PROTOCOL_UPDATE
] = { 1, sizeof(struct p_protocol
), receive_protocol
},
4621 [P_TRIM
] = { 0, sizeof(struct p_trim
), receive_Data
},
4624 static void drbdd(struct drbd_connection
*connection
)
4626 struct packet_info pi
;
4627 size_t shs
; /* sub header size */
4630 while (get_t_state(&connection
->receiver
) == RUNNING
) {
4631 struct data_cmd
*cmd
;
4633 drbd_thread_current_set_cpu(&connection
->receiver
);
4634 update_receiver_timing_details(connection
, drbd_recv_header
);
4635 if (drbd_recv_header(connection
, &pi
))
4638 cmd
= &drbd_cmd_handler
[pi
.cmd
];
4639 if (unlikely(pi
.cmd
>= ARRAY_SIZE(drbd_cmd_handler
) || !cmd
->fn
)) {
4640 drbd_err(connection
, "Unexpected data packet %s (0x%04x)",
4641 cmdname(pi
.cmd
), pi
.cmd
);
4645 shs
= cmd
->pkt_size
;
4646 if (pi
.size
> shs
&& !cmd
->expect_payload
) {
4647 drbd_err(connection
, "No payload expected %s l:%d\n",
4648 cmdname(pi
.cmd
), pi
.size
);
4653 update_receiver_timing_details(connection
, drbd_recv_all_warn
);
4654 err
= drbd_recv_all_warn(connection
, pi
.data
, shs
);
4660 update_receiver_timing_details(connection
, cmd
->fn
);
4661 err
= cmd
->fn(connection
, &pi
);
4663 drbd_err(connection
, "error receiving %s, e: %d l: %d!\n",
4664 cmdname(pi
.cmd
), err
, pi
.size
);
4671 conn_request_state(connection
, NS(conn
, C_PROTOCOL_ERROR
), CS_HARD
);
4674 static void conn_disconnect(struct drbd_connection
*connection
)
4676 struct drbd_peer_device
*peer_device
;
4680 if (connection
->cstate
== C_STANDALONE
)
4683 /* We are about to start the cleanup after connection loss.
4684 * Make sure drbd_make_request knows about that.
4685 * Usually we should be in some network failure state already,
4686 * but just in case we are not, we fix it up here.
4688 conn_request_state(connection
, NS(conn
, C_NETWORK_FAILURE
), CS_HARD
);
4690 /* ack_receiver does not clean up anything. it must not interfere, either */
4691 drbd_thread_stop(&connection
->ack_receiver
);
4692 if (connection
->ack_sender
) {
4693 destroy_workqueue(connection
->ack_sender
);
4694 connection
->ack_sender
= NULL
;
4696 drbd_free_sock(connection
);
4699 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
) {
4700 struct drbd_device
*device
= peer_device
->device
;
4701 kref_get(&device
->kref
);
4703 drbd_disconnected(peer_device
);
4704 kref_put(&device
->kref
, drbd_destroy_device
);
4709 if (!list_empty(&connection
->current_epoch
->list
))
4710 drbd_err(connection
, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4711 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4712 atomic_set(&connection
->current_epoch
->epoch_size
, 0);
4713 connection
->send
.seen_any_write_yet
= false;
4715 drbd_info(connection
, "Connection closed\n");
4717 if (conn_highest_role(connection
) == R_PRIMARY
&& conn_highest_pdsk(connection
) >= D_UNKNOWN
)
4718 conn_try_outdate_peer_async(connection
);
4720 spin_lock_irq(&connection
->resource
->req_lock
);
4721 oc
= connection
->cstate
;
4722 if (oc
>= C_UNCONNECTED
)
4723 _conn_request_state(connection
, NS(conn
, C_UNCONNECTED
), CS_VERBOSE
);
4725 spin_unlock_irq(&connection
->resource
->req_lock
);
4727 if (oc
== C_DISCONNECTING
)
4728 conn_request_state(connection
, NS(conn
, C_STANDALONE
), CS_VERBOSE
| CS_HARD
);
4731 static int drbd_disconnected(struct drbd_peer_device
*peer_device
)
4733 struct drbd_device
*device
= peer_device
->device
;
4736 /* wait for current activity to cease. */
4737 spin_lock_irq(&device
->resource
->req_lock
);
4738 _drbd_wait_ee_list_empty(device
, &device
->active_ee
);
4739 _drbd_wait_ee_list_empty(device
, &device
->sync_ee
);
4740 _drbd_wait_ee_list_empty(device
, &device
->read_ee
);
4741 spin_unlock_irq(&device
->resource
->req_lock
);
4743 /* We do not have data structures that would allow us to
4744 * get the rs_pending_cnt down to 0 again.
4745 * * On C_SYNC_TARGET we do not have any data structures describing
4746 * the pending RSDataRequest's we have sent.
4747 * * On C_SYNC_SOURCE there is no data structure that tracks
4748 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4749 * And no, it is not the sum of the reference counts in the
4750 * resync_LRU. The resync_LRU tracks the whole operation including
4751 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4753 drbd_rs_cancel_all(device
);
4754 device
->rs_total
= 0;
4755 device
->rs_failed
= 0;
4756 atomic_set(&device
->rs_pending_cnt
, 0);
4757 wake_up(&device
->misc_wait
);
4759 del_timer_sync(&device
->resync_timer
);
4760 resync_timer_fn((unsigned long)device
);
4762 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4763 * w_make_resync_request etc. which may still be on the worker queue
4764 * to be "canceled" */
4765 drbd_flush_workqueue(&peer_device
->connection
->sender_work
);
4767 drbd_finish_peer_reqs(device
);
4769 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4770 might have issued a work again. The one before drbd_finish_peer_reqs() is
4771 necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4772 drbd_flush_workqueue(&peer_device
->connection
->sender_work
);
4774 /* need to do it again, drbd_finish_peer_reqs() may have populated it
4775 * again via drbd_try_clear_on_disk_bm(). */
4776 drbd_rs_cancel_all(device
);
4778 kfree(device
->p_uuid
);
4779 device
->p_uuid
= NULL
;
4781 if (!drbd_suspended(device
))
4782 tl_clear(peer_device
->connection
);
4784 drbd_md_sync(device
);
4786 /* serialize with bitmap writeout triggered by the state change,
4788 wait_event(device
->misc_wait
, !test_bit(BITMAP_IO
, &device
->flags
));
4790 /* tcp_close and release of sendpage pages can be deferred. I don't
4791 * want to use SO_LINGER, because apparently it can be deferred for
4792 * more than 20 seconds (longest time I checked).
4794 * Actually we don't care for exactly when the network stack does its
4795 * put_page(), but release our reference on these pages right here.
4797 i
= drbd_free_peer_reqs(device
, &device
->net_ee
);
4799 drbd_info(device
, "net_ee not empty, killed %u entries\n", i
);
4800 i
= atomic_read(&device
->pp_in_use_by_net
);
4802 drbd_info(device
, "pp_in_use_by_net = %d, expected 0\n", i
);
4803 i
= atomic_read(&device
->pp_in_use
);
4805 drbd_info(device
, "pp_in_use = %d, expected 0\n", i
);
4807 D_ASSERT(device
, list_empty(&device
->read_ee
));
4808 D_ASSERT(device
, list_empty(&device
->active_ee
));
4809 D_ASSERT(device
, list_empty(&device
->sync_ee
));
4810 D_ASSERT(device
, list_empty(&device
->done_ee
));
4816 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4817 * we can agree on is stored in agreed_pro_version.
4819 * feature flags and the reserved array should be enough room for future
4820 * enhancements of the handshake protocol, and possible plugins...
4822 * for now, they are expected to be zero, but ignored.
4824 static int drbd_send_features(struct drbd_connection
*connection
)
4826 struct drbd_socket
*sock
;
4827 struct p_connection_features
*p
;
4829 sock
= &connection
->data
;
4830 p
= conn_prepare_command(connection
, sock
);
4833 memset(p
, 0, sizeof(*p
));
4834 p
->protocol_min
= cpu_to_be32(PRO_VERSION_MIN
);
4835 p
->protocol_max
= cpu_to_be32(PRO_VERSION_MAX
);
4836 p
->feature_flags
= cpu_to_be32(PRO_FEATURES
);
4837 return conn_send_command(connection
, sock
, P_CONNECTION_FEATURES
, sizeof(*p
), NULL
, 0);
4842 * 1 yes, we have a valid connection
4843 * 0 oops, did not work out, please try again
4844 * -1 peer talks different language,
4845 * no point in trying again, please go standalone.
4847 static int drbd_do_features(struct drbd_connection
*connection
)
4849 /* ASSERT current == connection->receiver ... */
4850 struct p_connection_features
*p
;
4851 const int expect
= sizeof(struct p_connection_features
);
4852 struct packet_info pi
;
4855 err
= drbd_send_features(connection
);
4859 err
= drbd_recv_header(connection
, &pi
);
4863 if (pi
.cmd
!= P_CONNECTION_FEATURES
) {
4864 drbd_err(connection
, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4865 cmdname(pi
.cmd
), pi
.cmd
);
4869 if (pi
.size
!= expect
) {
4870 drbd_err(connection
, "expected ConnectionFeatures length: %u, received: %u\n",
4876 err
= drbd_recv_all_warn(connection
, p
, expect
);
4880 p
->protocol_min
= be32_to_cpu(p
->protocol_min
);
4881 p
->protocol_max
= be32_to_cpu(p
->protocol_max
);
4882 if (p
->protocol_max
== 0)
4883 p
->protocol_max
= p
->protocol_min
;
4885 if (PRO_VERSION_MAX
< p
->protocol_min
||
4886 PRO_VERSION_MIN
> p
->protocol_max
)
4889 connection
->agreed_pro_version
= min_t(int, PRO_VERSION_MAX
, p
->protocol_max
);
4890 connection
->agreed_features
= PRO_FEATURES
& be32_to_cpu(p
->feature_flags
);
4892 drbd_info(connection
, "Handshake successful: "
4893 "Agreed network protocol version %d\n", connection
->agreed_pro_version
);
4895 drbd_info(connection
, "Agreed to%ssupport TRIM on protocol level\n",
4896 connection
->agreed_features
& FF_TRIM
? " " : " not ");
4901 drbd_err(connection
, "incompatible DRBD dialects: "
4902 "I support %d-%d, peer supports %d-%d\n",
4903 PRO_VERSION_MIN
, PRO_VERSION_MAX
,
4904 p
->protocol_min
, p
->protocol_max
);
4908 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4909 static int drbd_do_auth(struct drbd_connection
*connection
)
4911 drbd_err(connection
, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4912 drbd_err(connection
, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4916 #define CHALLENGE_LEN 64
4920 0 - failed, try again (network error),
4921 -1 - auth failed, don't try again.
4924 static int drbd_do_auth(struct drbd_connection
*connection
)
4926 struct drbd_socket
*sock
;
4927 char my_challenge
[CHALLENGE_LEN
]; /* 64 Bytes... */
4928 char *response
= NULL
;
4929 char *right_response
= NULL
;
4930 char *peers_ch
= NULL
;
4931 unsigned int key_len
;
4932 char secret
[SHARED_SECRET_MAX
]; /* 64 byte */
4933 unsigned int resp_size
;
4934 SHASH_DESC_ON_STACK(desc
, connection
->cram_hmac_tfm
);
4935 struct packet_info pi
;
4936 struct net_conf
*nc
;
4939 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
4942 nc
= rcu_dereference(connection
->net_conf
);
4943 key_len
= strlen(nc
->shared_secret
);
4944 memcpy(secret
, nc
->shared_secret
, key_len
);
4947 desc
->tfm
= connection
->cram_hmac_tfm
;
4950 rv
= crypto_shash_setkey(connection
->cram_hmac_tfm
, (u8
*)secret
, key_len
);
4952 drbd_err(connection
, "crypto_shash_setkey() failed with %d\n", rv
);
4957 get_random_bytes(my_challenge
, CHALLENGE_LEN
);
4959 sock
= &connection
->data
;
4960 if (!conn_prepare_command(connection
, sock
)) {
4964 rv
= !conn_send_command(connection
, sock
, P_AUTH_CHALLENGE
, 0,
4965 my_challenge
, CHALLENGE_LEN
);
4969 err
= drbd_recv_header(connection
, &pi
);
4975 if (pi
.cmd
!= P_AUTH_CHALLENGE
) {
4976 drbd_err(connection
, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4977 cmdname(pi
.cmd
), pi
.cmd
);
4982 if (pi
.size
> CHALLENGE_LEN
* 2) {
4983 drbd_err(connection
, "expected AuthChallenge payload too big.\n");
4988 if (pi
.size
< CHALLENGE_LEN
) {
4989 drbd_err(connection
, "AuthChallenge payload too small.\n");
4994 peers_ch
= kmalloc(pi
.size
, GFP_NOIO
);
4995 if (peers_ch
== NULL
) {
4996 drbd_err(connection
, "kmalloc of peers_ch failed\n");
5001 err
= drbd_recv_all_warn(connection
, peers_ch
, pi
.size
);
5007 if (!memcmp(my_challenge
, peers_ch
, CHALLENGE_LEN
)) {
5008 drbd_err(connection
, "Peer presented the same challenge!\n");
5013 resp_size
= crypto_shash_digestsize(connection
->cram_hmac_tfm
);
5014 response
= kmalloc(resp_size
, GFP_NOIO
);
5015 if (response
== NULL
) {
5016 drbd_err(connection
, "kmalloc of response failed\n");
5021 rv
= crypto_shash_digest(desc
, peers_ch
, pi
.size
, response
);
5023 drbd_err(connection
, "crypto_hash_digest() failed with %d\n", rv
);
5028 if (!conn_prepare_command(connection
, sock
)) {
5032 rv
= !conn_send_command(connection
, sock
, P_AUTH_RESPONSE
, 0,
5033 response
, resp_size
);
5037 err
= drbd_recv_header(connection
, &pi
);
5043 if (pi
.cmd
!= P_AUTH_RESPONSE
) {
5044 drbd_err(connection
, "expected AuthResponse packet, received: %s (0x%04x)\n",
5045 cmdname(pi
.cmd
), pi
.cmd
);
5050 if (pi
.size
!= resp_size
) {
5051 drbd_err(connection
, "expected AuthResponse payload of wrong size\n");
5056 err
= drbd_recv_all_warn(connection
, response
, resp_size
);
5062 right_response
= kmalloc(resp_size
, GFP_NOIO
);
5063 if (right_response
== NULL
) {
5064 drbd_err(connection
, "kmalloc of right_response failed\n");
5069 rv
= crypto_shash_digest(desc
, my_challenge
, CHALLENGE_LEN
,
5072 drbd_err(connection
, "crypto_hash_digest() failed with %d\n", rv
);
5077 rv
= !memcmp(response
, right_response
, resp_size
);
5080 drbd_info(connection
, "Peer authenticated using %d bytes HMAC\n",
5088 kfree(right_response
);
5089 shash_desc_zero(desc
);
5095 int drbd_receiver(struct drbd_thread
*thi
)
5097 struct drbd_connection
*connection
= thi
->connection
;
5100 drbd_info(connection
, "receiver (re)started\n");
5103 h
= conn_connect(connection
);
5105 conn_disconnect(connection
);
5106 schedule_timeout_interruptible(HZ
);
5109 drbd_warn(connection
, "Discarding network configuration.\n");
5110 conn_request_state(connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
5117 conn_disconnect(connection
);
5119 drbd_info(connection
, "receiver terminated\n");
5123 /* ********* acknowledge sender ******** */
5125 static int got_conn_RqSReply(struct drbd_connection
*connection
, struct packet_info
*pi
)
5127 struct p_req_state_reply
*p
= pi
->data
;
5128 int retcode
= be32_to_cpu(p
->retcode
);
5130 if (retcode
>= SS_SUCCESS
) {
5131 set_bit(CONN_WD_ST_CHG_OKAY
, &connection
->flags
);
5133 set_bit(CONN_WD_ST_CHG_FAIL
, &connection
->flags
);
5134 drbd_err(connection
, "Requested state change failed by peer: %s (%d)\n",
5135 drbd_set_st_err_str(retcode
), retcode
);
5137 wake_up(&connection
->ping_wait
);
5142 static int got_RqSReply(struct drbd_connection
*connection
, struct packet_info
*pi
)
5144 struct drbd_peer_device
*peer_device
;
5145 struct drbd_device
*device
;
5146 struct p_req_state_reply
*p
= pi
->data
;
5147 int retcode
= be32_to_cpu(p
->retcode
);
5149 peer_device
= conn_peer_device(connection
, pi
->vnr
);
5152 device
= peer_device
->device
;
5154 if (test_bit(CONN_WD_ST_CHG_REQ
, &connection
->flags
)) {
5155 D_ASSERT(device
, connection
->agreed_pro_version
< 100);
5156 return got_conn_RqSReply(connection
, pi
);
5159 if (retcode
>= SS_SUCCESS
) {
5160 set_bit(CL_ST_CHG_SUCCESS
, &device
->flags
);
5162 set_bit(CL_ST_CHG_FAIL
, &device
->flags
);
5163 drbd_err(device
, "Requested state change failed by peer: %s (%d)\n",
5164 drbd_set_st_err_str(retcode
), retcode
);
5166 wake_up(&device
->state_wait
);
5171 static int got_Ping(struct drbd_connection
*connection
, struct packet_info
*pi
)
5173 return drbd_send_ping_ack(connection
);
5177 static int got_PingAck(struct drbd_connection
*connection
, struct packet_info
*pi
)
5179 /* restore idle timeout */
5180 connection
->meta
.socket
->sk
->sk_rcvtimeo
= connection
->net_conf
->ping_int
*HZ
;
5181 if (!test_and_set_bit(GOT_PING_ACK
, &connection
->flags
))
5182 wake_up(&connection
->ping_wait
);
5187 static int got_IsInSync(struct drbd_connection
*connection
, struct packet_info
*pi
)
5189 struct drbd_peer_device
*peer_device
;
5190 struct drbd_device
*device
;
5191 struct p_block_ack
*p
= pi
->data
;
5192 sector_t sector
= be64_to_cpu(p
->sector
);
5193 int blksize
= be32_to_cpu(p
->blksize
);
5195 peer_device
= conn_peer_device(connection
, pi
->vnr
);
5198 device
= peer_device
->device
;
5200 D_ASSERT(device
, peer_device
->connection
->agreed_pro_version
>= 89);
5202 update_peer_seq(peer_device
, be32_to_cpu(p
->seq_num
));
5204 if (get_ldev(device
)) {
5205 drbd_rs_complete_io(device
, sector
);
5206 drbd_set_in_sync(device
, sector
, blksize
);
5207 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5208 device
->rs_same_csum
+= (blksize
>> BM_BLOCK_SHIFT
);
5211 dec_rs_pending(device
);
5212 atomic_add(blksize
>> 9, &device
->rs_sect_in
);
5218 validate_req_change_req_state(struct drbd_device
*device
, u64 id
, sector_t sector
,
5219 struct rb_root
*root
, const char *func
,
5220 enum drbd_req_event what
, bool missing_ok
)
5222 struct drbd_request
*req
;
5223 struct bio_and_error m
;
5225 spin_lock_irq(&device
->resource
->req_lock
);
5226 req
= find_request(device
, root
, id
, sector
, missing_ok
, func
);
5227 if (unlikely(!req
)) {
5228 spin_unlock_irq(&device
->resource
->req_lock
);
5231 __req_mod(req
, what
, &m
);
5232 spin_unlock_irq(&device
->resource
->req_lock
);
5235 complete_master_bio(device
, &m
);
5239 static int got_BlockAck(struct drbd_connection
*connection
, struct packet_info
*pi
)
5241 struct drbd_peer_device
*peer_device
;
5242 struct drbd_device
*device
;
5243 struct p_block_ack
*p
= pi
->data
;
5244 sector_t sector
= be64_to_cpu(p
->sector
);
5245 int blksize
= be32_to_cpu(p
->blksize
);
5246 enum drbd_req_event what
;
5248 peer_device
= conn_peer_device(connection
, pi
->vnr
);
5251 device
= peer_device
->device
;
5253 update_peer_seq(peer_device
, be32_to_cpu(p
->seq_num
));
5255 if (p
->block_id
== ID_SYNCER
) {
5256 drbd_set_in_sync(device
, sector
, blksize
);
5257 dec_rs_pending(device
);
5261 case P_RS_WRITE_ACK
:
5262 what
= WRITE_ACKED_BY_PEER_AND_SIS
;
5265 what
= WRITE_ACKED_BY_PEER
;
5268 what
= RECV_ACKED_BY_PEER
;
5271 what
= CONFLICT_RESOLVED
;
5274 what
= POSTPONE_WRITE
;
5280 return validate_req_change_req_state(device
, p
->block_id
, sector
,
5281 &device
->write_requests
, __func__
,
5285 static int got_NegAck(struct drbd_connection
*connection
, struct packet_info
*pi
)
5287 struct drbd_peer_device
*peer_device
;
5288 struct drbd_device
*device
;
5289 struct p_block_ack
*p
= pi
->data
;
5290 sector_t sector
= be64_to_cpu(p
->sector
);
5291 int size
= be32_to_cpu(p
->blksize
);
5294 peer_device
= conn_peer_device(connection
, pi
->vnr
);
5297 device
= peer_device
->device
;
5299 update_peer_seq(peer_device
, be32_to_cpu(p
->seq_num
));
5301 if (p
->block_id
== ID_SYNCER
) {
5302 dec_rs_pending(device
);
5303 drbd_rs_failed_io(device
, sector
, size
);
5307 err
= validate_req_change_req_state(device
, p
->block_id
, sector
,
5308 &device
->write_requests
, __func__
,
5311 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5312 The master bio might already be completed, therefore the
5313 request is no longer in the collision hash. */
5314 /* In Protocol B we might already have got a P_RECV_ACK
5315 but then get a P_NEG_ACK afterwards. */
5316 drbd_set_out_of_sync(device
, sector
, size
);
5321 static int got_NegDReply(struct drbd_connection
*connection
, struct packet_info
*pi
)
5323 struct drbd_peer_device
*peer_device
;
5324 struct drbd_device
*device
;
5325 struct p_block_ack
*p
= pi
->data
;
5326 sector_t sector
= be64_to_cpu(p
->sector
);
5328 peer_device
= conn_peer_device(connection
, pi
->vnr
);
5331 device
= peer_device
->device
;
5333 update_peer_seq(peer_device
, be32_to_cpu(p
->seq_num
));
5335 drbd_err(device
, "Got NegDReply; Sector %llus, len %u.\n",
5336 (unsigned long long)sector
, be32_to_cpu(p
->blksize
));
5338 return validate_req_change_req_state(device
, p
->block_id
, sector
,
5339 &device
->read_requests
, __func__
,
5343 static int got_NegRSDReply(struct drbd_connection
*connection
, struct packet_info
*pi
)
5345 struct drbd_peer_device
*peer_device
;
5346 struct drbd_device
*device
;
5349 struct p_block_ack
*p
= pi
->data
;
5351 peer_device
= conn_peer_device(connection
, pi
->vnr
);
5354 device
= peer_device
->device
;
5356 sector
= be64_to_cpu(p
->sector
);
5357 size
= be32_to_cpu(p
->blksize
);
5359 update_peer_seq(peer_device
, be32_to_cpu(p
->seq_num
));
5361 dec_rs_pending(device
);
5363 if (get_ldev_if_state(device
, D_FAILED
)) {
5364 drbd_rs_complete_io(device
, sector
);
5366 case P_NEG_RS_DREPLY
:
5367 drbd_rs_failed_io(device
, sector
, size
);
5379 static int got_BarrierAck(struct drbd_connection
*connection
, struct packet_info
*pi
)
5381 struct p_barrier_ack
*p
= pi
->data
;
5382 struct drbd_peer_device
*peer_device
;
5385 tl_release(connection
, p
->barrier
, be32_to_cpu(p
->set_size
));
5388 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
) {
5389 struct drbd_device
*device
= peer_device
->device
;
5391 if (device
->state
.conn
== C_AHEAD
&&
5392 atomic_read(&device
->ap_in_flight
) == 0 &&
5393 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE
, &device
->flags
)) {
5394 device
->start_resync_timer
.expires
= jiffies
+ HZ
;
5395 add_timer(&device
->start_resync_timer
);
5403 static int got_OVResult(struct drbd_connection
*connection
, struct packet_info
*pi
)
5405 struct drbd_peer_device
*peer_device
;
5406 struct drbd_device
*device
;
5407 struct p_block_ack
*p
= pi
->data
;
5408 struct drbd_device_work
*dw
;
5412 peer_device
= conn_peer_device(connection
, pi
->vnr
);
5415 device
= peer_device
->device
;
5417 sector
= be64_to_cpu(p
->sector
);
5418 size
= be32_to_cpu(p
->blksize
);
5420 update_peer_seq(peer_device
, be32_to_cpu(p
->seq_num
));
5422 if (be64_to_cpu(p
->block_id
) == ID_OUT_OF_SYNC
)
5423 drbd_ov_out_of_sync_found(device
, sector
, size
);
5425 ov_out_of_sync_print(device
);
5427 if (!get_ldev(device
))
5430 drbd_rs_complete_io(device
, sector
);
5431 dec_rs_pending(device
);
5435 /* let's advance progress step marks only for every other megabyte */
5436 if ((device
->ov_left
& 0x200) == 0x200)
5437 drbd_advance_rs_marks(device
, device
->ov_left
);
5439 if (device
->ov_left
== 0) {
5440 dw
= kmalloc(sizeof(*dw
), GFP_NOIO
);
5442 dw
->w
.cb
= w_ov_finished
;
5443 dw
->device
= device
;
5444 drbd_queue_work(&peer_device
->connection
->sender_work
, &dw
->w
);
5446 drbd_err(device
, "kmalloc(dw) failed.");
5447 ov_out_of_sync_print(device
);
5448 drbd_resync_finished(device
);
5455 static int got_skip(struct drbd_connection
*connection
, struct packet_info
*pi
)
5460 struct meta_sock_cmd
{
5462 int (*fn
)(struct drbd_connection
*connection
, struct packet_info
*);
5465 static void set_rcvtimeo(struct drbd_connection
*connection
, bool ping_timeout
)
5468 struct net_conf
*nc
;
5471 nc
= rcu_dereference(connection
->net_conf
);
5472 t
= ping_timeout
? nc
->ping_timeo
: nc
->ping_int
;
5479 connection
->meta
.socket
->sk
->sk_rcvtimeo
= t
;
5482 static void set_ping_timeout(struct drbd_connection
*connection
)
5484 set_rcvtimeo(connection
, 1);
5487 static void set_idle_timeout(struct drbd_connection
*connection
)
5489 set_rcvtimeo(connection
, 0);
5492 static struct meta_sock_cmd ack_receiver_tbl
[] = {
5493 [P_PING
] = { 0, got_Ping
},
5494 [P_PING_ACK
] = { 0, got_PingAck
},
5495 [P_RECV_ACK
] = { sizeof(struct p_block_ack
), got_BlockAck
},
5496 [P_WRITE_ACK
] = { sizeof(struct p_block_ack
), got_BlockAck
},
5497 [P_RS_WRITE_ACK
] = { sizeof(struct p_block_ack
), got_BlockAck
},
5498 [P_SUPERSEDED
] = { sizeof(struct p_block_ack
), got_BlockAck
},
5499 [P_NEG_ACK
] = { sizeof(struct p_block_ack
), got_NegAck
},
5500 [P_NEG_DREPLY
] = { sizeof(struct p_block_ack
), got_NegDReply
},
5501 [P_NEG_RS_DREPLY
] = { sizeof(struct p_block_ack
), got_NegRSDReply
},
5502 [P_OV_RESULT
] = { sizeof(struct p_block_ack
), got_OVResult
},
5503 [P_BARRIER_ACK
] = { sizeof(struct p_barrier_ack
), got_BarrierAck
},
5504 [P_STATE_CHG_REPLY
] = { sizeof(struct p_req_state_reply
), got_RqSReply
},
5505 [P_RS_IS_IN_SYNC
] = { sizeof(struct p_block_ack
), got_IsInSync
},
5506 [P_DELAY_PROBE
] = { sizeof(struct p_delay_probe93
), got_skip
},
5507 [P_RS_CANCEL
] = { sizeof(struct p_block_ack
), got_NegRSDReply
},
5508 [P_CONN_ST_CHG_REPLY
]={ sizeof(struct p_req_state_reply
), got_conn_RqSReply
},
5509 [P_RETRY_WRITE
] = { sizeof(struct p_block_ack
), got_BlockAck
},
5512 int drbd_ack_receiver(struct drbd_thread
*thi
)
5514 struct drbd_connection
*connection
= thi
->connection
;
5515 struct meta_sock_cmd
*cmd
= NULL
;
5516 struct packet_info pi
;
5517 unsigned long pre_recv_jif
;
5519 void *buf
= connection
->meta
.rbuf
;
5521 unsigned int header_size
= drbd_header_size(connection
);
5522 int expect
= header_size
;
5523 bool ping_timeout_active
= false;
5524 struct sched_param param
= { .sched_priority
= 2 };
5526 rv
= sched_setscheduler(current
, SCHED_RR
, ¶m
);
5528 drbd_err(connection
, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv
);
5530 while (get_t_state(thi
) == RUNNING
) {
5531 drbd_thread_current_set_cpu(thi
);
5533 conn_reclaim_net_peer_reqs(connection
);
5535 if (test_and_clear_bit(SEND_PING
, &connection
->flags
)) {
5536 if (drbd_send_ping(connection
)) {
5537 drbd_err(connection
, "drbd_send_ping has failed\n");
5540 set_ping_timeout(connection
);
5541 ping_timeout_active
= true;
5544 pre_recv_jif
= jiffies
;
5545 rv
= drbd_recv_short(connection
->meta
.socket
, buf
, expect
-received
, 0);
5548 * -EINTR (on meta) we got a signal
5549 * -EAGAIN (on meta) rcvtimeo expired
5550 * -ECONNRESET other side closed the connection
5551 * -ERESTARTSYS (on data) we got a signal
5552 * rv < 0 other than above: unexpected error!
5553 * rv == expected: full header or command
5554 * rv < expected: "woken" by signal during receive
5555 * rv == 0 : "connection shut down by peer"
5557 if (likely(rv
> 0)) {
5560 } else if (rv
== 0) {
5561 if (test_bit(DISCONNECT_SENT
, &connection
->flags
)) {
5564 t
= rcu_dereference(connection
->net_conf
)->ping_timeo
* HZ
/10;
5567 t
= wait_event_timeout(connection
->ping_wait
,
5568 connection
->cstate
< C_WF_REPORT_PARAMS
,
5573 drbd_err(connection
, "meta connection shut down by peer.\n");
5575 } else if (rv
== -EAGAIN
) {
5576 /* If the data socket received something meanwhile,
5577 * that is good enough: peer is still alive. */
5578 if (time_after(connection
->last_received
, pre_recv_jif
))
5580 if (ping_timeout_active
) {
5581 drbd_err(connection
, "PingAck did not arrive in time.\n");
5584 set_bit(SEND_PING
, &connection
->flags
);
5586 } else if (rv
== -EINTR
) {
5587 /* maybe drbd_thread_stop(): the while condition will notice.
5588 * maybe woken for send_ping: we'll send a ping above,
5589 * and change the rcvtimeo */
5590 flush_signals(current
);
5593 drbd_err(connection
, "sock_recvmsg returned %d\n", rv
);
5597 if (received
== expect
&& cmd
== NULL
) {
5598 if (decode_header(connection
, connection
->meta
.rbuf
, &pi
))
5600 cmd
= &ack_receiver_tbl
[pi
.cmd
];
5601 if (pi
.cmd
>= ARRAY_SIZE(ack_receiver_tbl
) || !cmd
->fn
) {
5602 drbd_err(connection
, "Unexpected meta packet %s (0x%04x)\n",
5603 cmdname(pi
.cmd
), pi
.cmd
);
5606 expect
= header_size
+ cmd
->pkt_size
;
5607 if (pi
.size
!= expect
- header_size
) {
5608 drbd_err(connection
, "Wrong packet size on meta (c: %d, l: %d)\n",
5613 if (received
== expect
) {
5616 err
= cmd
->fn(connection
, &pi
);
5618 drbd_err(connection
, "%pf failed\n", cmd
->fn
);
5622 connection
->last_received
= jiffies
;
5624 if (cmd
== &ack_receiver_tbl
[P_PING_ACK
]) {
5625 set_idle_timeout(connection
);
5626 ping_timeout_active
= false;
5629 buf
= connection
->meta
.rbuf
;
5631 expect
= header_size
;
5638 conn_request_state(connection
, NS(conn
, C_NETWORK_FAILURE
), CS_HARD
);
5639 conn_md_sync(connection
);
5643 conn_request_state(connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
5646 drbd_info(connection
, "ack_receiver terminated\n");
5651 void drbd_send_acks_wf(struct work_struct
*ws
)
5653 struct drbd_peer_device
*peer_device
=
5654 container_of(ws
, struct drbd_peer_device
, send_acks_work
);
5655 struct drbd_connection
*connection
= peer_device
->connection
;
5656 struct drbd_device
*device
= peer_device
->device
;
5657 struct net_conf
*nc
;
5661 nc
= rcu_dereference(connection
->net_conf
);
5662 tcp_cork
= nc
->tcp_cork
;
5666 drbd_tcp_cork(connection
->meta
.socket
);
5668 err
= drbd_finish_peer_reqs(device
);
5669 kref_put(&device
->kref
, drbd_destroy_device
);
5670 /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
5671 struct work_struct send_acks_work alive, which is in the peer_device object */
5674 conn_request_state(connection
, NS(conn
, C_NETWORK_FAILURE
), CS_HARD
);
5679 drbd_tcp_uncork(connection
->meta
.socket
);