4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
28 #include <linux/uaccess.h>
31 #include <linux/drbd.h>
33 #include <linux/file.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <uapi/linux/sched/types.h>
40 #include <linux/sched/signal.h>
41 #include <linux/pkt_sched.h>
42 #define __KERNEL_SYSCALLS__
43 #include <linux/unistd.h>
44 #include <linux/vmalloc.h>
45 #include <linux/random.h>
46 #include <linux/string.h>
47 #include <linux/scatterlist.h>
49 #include "drbd_protocol.h"
53 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME)
68 static int drbd_do_features(struct drbd_connection
*connection
);
69 static int drbd_do_auth(struct drbd_connection
*connection
);
70 static int drbd_disconnected(struct drbd_peer_device
*);
71 static void conn_wait_active_ee_empty(struct drbd_connection
*connection
);
72 static enum finish_epoch
drbd_may_finish_epoch(struct drbd_connection
*, struct drbd_epoch
*, enum epoch_event
);
73 static int e_end_block(struct drbd_work
*, int);
76 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
79 * some helper functions to deal with single linked page lists,
80 * page->private being our "next" pointer.
83 /* If at least n pages are linked at head, get n pages off.
84 * Otherwise, don't modify head, and return NULL.
85 * Locking is the responsibility of the caller.
87 static struct page
*page_chain_del(struct page
**head
, int n
)
101 tmp
= page_chain_next(page
);
103 break; /* found sufficient pages */
105 /* insufficient pages, don't use any of them. */
110 /* add end of list marker for the returned list */
111 set_page_private(page
, 0);
112 /* actual return value, and adjustment of head */
118 /* may be used outside of locks to find the tail of a (usually short)
119 * "private" page chain, before adding it back to a global chain head
120 * with page_chain_add() under a spinlock. */
121 static struct page
*page_chain_tail(struct page
*page
, int *len
)
125 while ((tmp
= page_chain_next(page
)))
132 static int page_chain_free(struct page
*page
)
136 page_chain_for_each_safe(page
, tmp
) {
143 static void page_chain_add(struct page
**head
,
144 struct page
*chain_first
, struct page
*chain_last
)
148 tmp
= page_chain_tail(chain_first
, NULL
);
149 BUG_ON(tmp
!= chain_last
);
152 /* add chain to head */
153 set_page_private(chain_last
, (unsigned long)*head
);
157 static struct page
*__drbd_alloc_pages(struct drbd_device
*device
,
160 struct page
*page
= NULL
;
161 struct page
*tmp
= NULL
;
164 /* Yes, testing drbd_pp_vacant outside the lock is racy.
165 * So what. It saves a spin_lock. */
166 if (drbd_pp_vacant
>= number
) {
167 spin_lock(&drbd_pp_lock
);
168 page
= page_chain_del(&drbd_pp_pool
, number
);
170 drbd_pp_vacant
-= number
;
171 spin_unlock(&drbd_pp_lock
);
176 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
177 * "criss-cross" setup, that might cause write-out on some other DRBD,
178 * which in turn might block on the other node at this very place. */
179 for (i
= 0; i
< number
; i
++) {
180 tmp
= alloc_page(GFP_TRY
);
183 set_page_private(tmp
, (unsigned long)page
);
190 /* Not enough pages immediately available this time.
191 * No need to jump around here, drbd_alloc_pages will retry this
192 * function "soon". */
194 tmp
= page_chain_tail(page
, NULL
);
195 spin_lock(&drbd_pp_lock
);
196 page_chain_add(&drbd_pp_pool
, page
, tmp
);
198 spin_unlock(&drbd_pp_lock
);
203 static void reclaim_finished_net_peer_reqs(struct drbd_device
*device
,
204 struct list_head
*to_be_freed
)
206 struct drbd_peer_request
*peer_req
, *tmp
;
208 /* The EEs are always appended to the end of the list. Since
209 they are sent in order over the wire, they have to finish
210 in order. As soon as we see the first not finished we can
211 stop to examine the list... */
213 list_for_each_entry_safe(peer_req
, tmp
, &device
->net_ee
, w
.list
) {
214 if (drbd_peer_req_has_active_page(peer_req
))
216 list_move(&peer_req
->w
.list
, to_be_freed
);
220 static void drbd_reclaim_net_peer_reqs(struct drbd_device
*device
)
222 LIST_HEAD(reclaimed
);
223 struct drbd_peer_request
*peer_req
, *t
;
225 spin_lock_irq(&device
->resource
->req_lock
);
226 reclaim_finished_net_peer_reqs(device
, &reclaimed
);
227 spin_unlock_irq(&device
->resource
->req_lock
);
228 list_for_each_entry_safe(peer_req
, t
, &reclaimed
, w
.list
)
229 drbd_free_net_peer_req(device
, peer_req
);
232 static void conn_reclaim_net_peer_reqs(struct drbd_connection
*connection
)
234 struct drbd_peer_device
*peer_device
;
238 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
) {
239 struct drbd_device
*device
= peer_device
->device
;
240 if (!atomic_read(&device
->pp_in_use_by_net
))
243 kref_get(&device
->kref
);
245 drbd_reclaim_net_peer_reqs(device
);
246 kref_put(&device
->kref
, drbd_destroy_device
);
253 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
254 * @device: DRBD device.
255 * @number: number of pages requested
256 * @retry: whether to retry, if not enough pages are available right now
258 * Tries to allocate number pages, first from our own page pool, then from
260 * Possibly retry until DRBD frees sufficient pages somewhere else.
262 * If this allocation would exceed the max_buffers setting, we throttle
263 * allocation (schedule_timeout) to give the system some room to breathe.
265 * We do not use max-buffers as hard limit, because it could lead to
266 * congestion and further to a distributed deadlock during online-verify or
267 * (checksum based) resync, if the max-buffers, socket buffer sizes and
268 * resync-rate settings are mis-configured.
270 * Returns a page chain linked via page->private.
272 struct page
*drbd_alloc_pages(struct drbd_peer_device
*peer_device
, unsigned int number
,
275 struct drbd_device
*device
= peer_device
->device
;
276 struct page
*page
= NULL
;
282 nc
= rcu_dereference(peer_device
->connection
->net_conf
);
283 mxb
= nc
? nc
->max_buffers
: 1000000;
286 if (atomic_read(&device
->pp_in_use
) < mxb
)
287 page
= __drbd_alloc_pages(device
, number
);
289 /* Try to keep the fast path fast, but occasionally we need
290 * to reclaim the pages we lended to the network stack. */
291 if (page
&& atomic_read(&device
->pp_in_use_by_net
) > 512)
292 drbd_reclaim_net_peer_reqs(device
);
294 while (page
== NULL
) {
295 prepare_to_wait(&drbd_pp_wait
, &wait
, TASK_INTERRUPTIBLE
);
297 drbd_reclaim_net_peer_reqs(device
);
299 if (atomic_read(&device
->pp_in_use
) < mxb
) {
300 page
= __drbd_alloc_pages(device
, number
);
308 if (signal_pending(current
)) {
309 drbd_warn(device
, "drbd_alloc_pages interrupted!\n");
313 if (schedule_timeout(HZ
/10) == 0)
316 finish_wait(&drbd_pp_wait
, &wait
);
319 atomic_add(number
, &device
->pp_in_use
);
323 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
324 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
325 * Either links the page chain back to the global pool,
326 * or returns all pages to the system. */
327 static void drbd_free_pages(struct drbd_device
*device
, struct page
*page
, int is_net
)
329 atomic_t
*a
= is_net
? &device
->pp_in_use_by_net
: &device
->pp_in_use
;
335 if (drbd_pp_vacant
> (DRBD_MAX_BIO_SIZE
/PAGE_SIZE
) * minor_count
)
336 i
= page_chain_free(page
);
339 tmp
= page_chain_tail(page
, &i
);
340 spin_lock(&drbd_pp_lock
);
341 page_chain_add(&drbd_pp_pool
, page
, tmp
);
343 spin_unlock(&drbd_pp_lock
);
345 i
= atomic_sub_return(i
, a
);
347 drbd_warn(device
, "ASSERTION FAILED: %s: %d < 0\n",
348 is_net
? "pp_in_use_by_net" : "pp_in_use", i
);
349 wake_up(&drbd_pp_wait
);
353 You need to hold the req_lock:
354 _drbd_wait_ee_list_empty()
356 You must not have the req_lock:
358 drbd_alloc_peer_req()
359 drbd_free_peer_reqs()
361 drbd_finish_peer_reqs()
363 drbd_wait_ee_list_empty()
366 /* normal: payload_size == request size (bi_size)
367 * w_same: payload_size == logical_block_size
368 * trim: payload_size == 0 */
369 struct drbd_peer_request
*
370 drbd_alloc_peer_req(struct drbd_peer_device
*peer_device
, u64 id
, sector_t sector
,
371 unsigned int request_size
, unsigned int payload_size
, gfp_t gfp_mask
) __must_hold(local
)
373 struct drbd_device
*device
= peer_device
->device
;
374 struct drbd_peer_request
*peer_req
;
375 struct page
*page
= NULL
;
376 unsigned nr_pages
= (payload_size
+ PAGE_SIZE
-1) >> PAGE_SHIFT
;
378 if (drbd_insert_fault(device
, DRBD_FAULT_AL_EE
))
381 peer_req
= mempool_alloc(drbd_ee_mempool
, gfp_mask
& ~__GFP_HIGHMEM
);
383 if (!(gfp_mask
& __GFP_NOWARN
))
384 drbd_err(device
, "%s: allocation failed\n", __func__
);
389 page
= drbd_alloc_pages(peer_device
, nr_pages
,
390 gfpflags_allow_blocking(gfp_mask
));
395 memset(peer_req
, 0, sizeof(*peer_req
));
396 INIT_LIST_HEAD(&peer_req
->w
.list
);
397 drbd_clear_interval(&peer_req
->i
);
398 peer_req
->i
.size
= request_size
;
399 peer_req
->i
.sector
= sector
;
400 peer_req
->submit_jif
= jiffies
;
401 peer_req
->peer_device
= peer_device
;
402 peer_req
->pages
= page
;
404 * The block_id is opaque to the receiver. It is not endianness
405 * converted, and sent back to the sender unchanged.
407 peer_req
->block_id
= id
;
412 mempool_free(peer_req
, drbd_ee_mempool
);
416 void __drbd_free_peer_req(struct drbd_device
*device
, struct drbd_peer_request
*peer_req
,
420 if (peer_req
->flags
& EE_HAS_DIGEST
)
421 kfree(peer_req
->digest
);
422 drbd_free_pages(device
, peer_req
->pages
, is_net
);
423 D_ASSERT(device
, atomic_read(&peer_req
->pending_bios
) == 0);
424 D_ASSERT(device
, drbd_interval_empty(&peer_req
->i
));
425 if (!expect(!(peer_req
->flags
& EE_CALL_AL_COMPLETE_IO
))) {
426 peer_req
->flags
&= ~EE_CALL_AL_COMPLETE_IO
;
427 drbd_al_complete_io(device
, &peer_req
->i
);
429 mempool_free(peer_req
, drbd_ee_mempool
);
432 int drbd_free_peer_reqs(struct drbd_device
*device
, struct list_head
*list
)
434 LIST_HEAD(work_list
);
435 struct drbd_peer_request
*peer_req
, *t
;
437 int is_net
= list
== &device
->net_ee
;
439 spin_lock_irq(&device
->resource
->req_lock
);
440 list_splice_init(list
, &work_list
);
441 spin_unlock_irq(&device
->resource
->req_lock
);
443 list_for_each_entry_safe(peer_req
, t
, &work_list
, w
.list
) {
444 __drbd_free_peer_req(device
, peer_req
, is_net
);
451 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
453 static int drbd_finish_peer_reqs(struct drbd_device
*device
)
455 LIST_HEAD(work_list
);
456 LIST_HEAD(reclaimed
);
457 struct drbd_peer_request
*peer_req
, *t
;
460 spin_lock_irq(&device
->resource
->req_lock
);
461 reclaim_finished_net_peer_reqs(device
, &reclaimed
);
462 list_splice_init(&device
->done_ee
, &work_list
);
463 spin_unlock_irq(&device
->resource
->req_lock
);
465 list_for_each_entry_safe(peer_req
, t
, &reclaimed
, w
.list
)
466 drbd_free_net_peer_req(device
, peer_req
);
468 /* possible callbacks here:
469 * e_end_block, and e_end_resync_block, e_send_superseded.
470 * all ignore the last argument.
472 list_for_each_entry_safe(peer_req
, t
, &work_list
, w
.list
) {
475 /* list_del not necessary, next/prev members not touched */
476 err2
= peer_req
->w
.cb(&peer_req
->w
, !!err
);
479 drbd_free_peer_req(device
, peer_req
);
481 wake_up(&device
->ee_wait
);
486 static void _drbd_wait_ee_list_empty(struct drbd_device
*device
,
487 struct list_head
*head
)
491 /* avoids spin_lock/unlock
492 * and calling prepare_to_wait in the fast path */
493 while (!list_empty(head
)) {
494 prepare_to_wait(&device
->ee_wait
, &wait
, TASK_UNINTERRUPTIBLE
);
495 spin_unlock_irq(&device
->resource
->req_lock
);
497 finish_wait(&device
->ee_wait
, &wait
);
498 spin_lock_irq(&device
->resource
->req_lock
);
502 static void drbd_wait_ee_list_empty(struct drbd_device
*device
,
503 struct list_head
*head
)
505 spin_lock_irq(&device
->resource
->req_lock
);
506 _drbd_wait_ee_list_empty(device
, head
);
507 spin_unlock_irq(&device
->resource
->req_lock
);
510 static int drbd_recv_short(struct socket
*sock
, void *buf
, size_t size
, int flags
)
516 struct msghdr msg
= {
517 .msg_flags
= (flags
? flags
: MSG_WAITALL
| MSG_NOSIGNAL
)
519 return kernel_recvmsg(sock
, &msg
, &iov
, 1, size
, msg
.msg_flags
);
522 static int drbd_recv(struct drbd_connection
*connection
, void *buf
, size_t size
)
526 rv
= drbd_recv_short(connection
->data
.socket
, buf
, size
, 0);
529 if (rv
== -ECONNRESET
)
530 drbd_info(connection
, "sock was reset by peer\n");
531 else if (rv
!= -ERESTARTSYS
)
532 drbd_err(connection
, "sock_recvmsg returned %d\n", rv
);
533 } else if (rv
== 0) {
534 if (test_bit(DISCONNECT_SENT
, &connection
->flags
)) {
537 t
= rcu_dereference(connection
->net_conf
)->ping_timeo
* HZ
/10;
540 t
= wait_event_timeout(connection
->ping_wait
, connection
->cstate
< C_WF_REPORT_PARAMS
, t
);
545 drbd_info(connection
, "sock was shut down by peer\n");
549 conn_request_state(connection
, NS(conn
, C_BROKEN_PIPE
), CS_HARD
);
555 static int drbd_recv_all(struct drbd_connection
*connection
, void *buf
, size_t size
)
559 err
= drbd_recv(connection
, buf
, size
);
568 static int drbd_recv_all_warn(struct drbd_connection
*connection
, void *buf
, size_t size
)
572 err
= drbd_recv_all(connection
, buf
, size
);
573 if (err
&& !signal_pending(current
))
574 drbd_warn(connection
, "short read (expected size %d)\n", (int)size
);
579 * On individual connections, the socket buffer size must be set prior to the
580 * listen(2) or connect(2) calls in order to have it take effect.
581 * This is our wrapper to do so.
583 static void drbd_setbufsize(struct socket
*sock
, unsigned int snd
,
586 /* open coded SO_SNDBUF, SO_RCVBUF */
588 sock
->sk
->sk_sndbuf
= snd
;
589 sock
->sk
->sk_userlocks
|= SOCK_SNDBUF_LOCK
;
592 sock
->sk
->sk_rcvbuf
= rcv
;
593 sock
->sk
->sk_userlocks
|= SOCK_RCVBUF_LOCK
;
597 static struct socket
*drbd_try_connect(struct drbd_connection
*connection
)
601 struct sockaddr_in6 src_in6
;
602 struct sockaddr_in6 peer_in6
;
604 int err
, peer_addr_len
, my_addr_len
;
605 int sndbuf_size
, rcvbuf_size
, connect_int
;
606 int disconnect_on_error
= 1;
609 nc
= rcu_dereference(connection
->net_conf
);
614 sndbuf_size
= nc
->sndbuf_size
;
615 rcvbuf_size
= nc
->rcvbuf_size
;
616 connect_int
= nc
->connect_int
;
619 my_addr_len
= min_t(int, connection
->my_addr_len
, sizeof(src_in6
));
620 memcpy(&src_in6
, &connection
->my_addr
, my_addr_len
);
622 if (((struct sockaddr
*)&connection
->my_addr
)->sa_family
== AF_INET6
)
623 src_in6
.sin6_port
= 0;
625 ((struct sockaddr_in
*)&src_in6
)->sin_port
= 0; /* AF_INET & AF_SCI */
627 peer_addr_len
= min_t(int, connection
->peer_addr_len
, sizeof(src_in6
));
628 memcpy(&peer_in6
, &connection
->peer_addr
, peer_addr_len
);
630 what
= "sock_create_kern";
631 err
= sock_create_kern(&init_net
, ((struct sockaddr
*)&src_in6
)->sa_family
,
632 SOCK_STREAM
, IPPROTO_TCP
, &sock
);
638 sock
->sk
->sk_rcvtimeo
=
639 sock
->sk
->sk_sndtimeo
= connect_int
* HZ
;
640 drbd_setbufsize(sock
, sndbuf_size
, rcvbuf_size
);
642 /* explicitly bind to the configured IP as source IP
643 * for the outgoing connections.
644 * This is needed for multihomed hosts and to be
645 * able to use lo: interfaces for drbd.
646 * Make sure to use 0 as port number, so linux selects
647 * a free one dynamically.
649 what
= "bind before connect";
650 err
= sock
->ops
->bind(sock
, (struct sockaddr
*) &src_in6
, my_addr_len
);
654 /* connect may fail, peer not yet available.
655 * stay C_WF_CONNECTION, don't go Disconnecting! */
656 disconnect_on_error
= 0;
658 err
= sock
->ops
->connect(sock
, (struct sockaddr
*) &peer_in6
, peer_addr_len
, 0);
667 /* timeout, busy, signal pending */
668 case ETIMEDOUT
: case EAGAIN
: case EINPROGRESS
:
669 case EINTR
: case ERESTARTSYS
:
670 /* peer not (yet) available, network problem */
671 case ECONNREFUSED
: case ENETUNREACH
:
672 case EHOSTDOWN
: case EHOSTUNREACH
:
673 disconnect_on_error
= 0;
676 drbd_err(connection
, "%s failed, err = %d\n", what
, err
);
678 if (disconnect_on_error
)
679 conn_request_state(connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
685 struct accept_wait_data
{
686 struct drbd_connection
*connection
;
687 struct socket
*s_listen
;
688 struct completion door_bell
;
689 void (*original_sk_state_change
)(struct sock
*sk
);
693 static void drbd_incoming_connection(struct sock
*sk
)
695 struct accept_wait_data
*ad
= sk
->sk_user_data
;
696 void (*state_change
)(struct sock
*sk
);
698 state_change
= ad
->original_sk_state_change
;
699 if (sk
->sk_state
== TCP_ESTABLISHED
)
700 complete(&ad
->door_bell
);
704 static int prepare_listen_socket(struct drbd_connection
*connection
, struct accept_wait_data
*ad
)
706 int err
, sndbuf_size
, rcvbuf_size
, my_addr_len
;
707 struct sockaddr_in6 my_addr
;
708 struct socket
*s_listen
;
713 nc
= rcu_dereference(connection
->net_conf
);
718 sndbuf_size
= nc
->sndbuf_size
;
719 rcvbuf_size
= nc
->rcvbuf_size
;
722 my_addr_len
= min_t(int, connection
->my_addr_len
, sizeof(struct sockaddr_in6
));
723 memcpy(&my_addr
, &connection
->my_addr
, my_addr_len
);
725 what
= "sock_create_kern";
726 err
= sock_create_kern(&init_net
, ((struct sockaddr
*)&my_addr
)->sa_family
,
727 SOCK_STREAM
, IPPROTO_TCP
, &s_listen
);
733 s_listen
->sk
->sk_reuse
= SK_CAN_REUSE
; /* SO_REUSEADDR */
734 drbd_setbufsize(s_listen
, sndbuf_size
, rcvbuf_size
);
736 what
= "bind before listen";
737 err
= s_listen
->ops
->bind(s_listen
, (struct sockaddr
*)&my_addr
, my_addr_len
);
741 ad
->s_listen
= s_listen
;
742 write_lock_bh(&s_listen
->sk
->sk_callback_lock
);
743 ad
->original_sk_state_change
= s_listen
->sk
->sk_state_change
;
744 s_listen
->sk
->sk_state_change
= drbd_incoming_connection
;
745 s_listen
->sk
->sk_user_data
= ad
;
746 write_unlock_bh(&s_listen
->sk
->sk_callback_lock
);
749 err
= s_listen
->ops
->listen(s_listen
, 5);
756 sock_release(s_listen
);
758 if (err
!= -EAGAIN
&& err
!= -EINTR
&& err
!= -ERESTARTSYS
) {
759 drbd_err(connection
, "%s failed, err = %d\n", what
, err
);
760 conn_request_state(connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
767 static void unregister_state_change(struct sock
*sk
, struct accept_wait_data
*ad
)
769 write_lock_bh(&sk
->sk_callback_lock
);
770 sk
->sk_state_change
= ad
->original_sk_state_change
;
771 sk
->sk_user_data
= NULL
;
772 write_unlock_bh(&sk
->sk_callback_lock
);
775 static struct socket
*drbd_wait_for_connect(struct drbd_connection
*connection
, struct accept_wait_data
*ad
)
777 int timeo
, connect_int
, err
= 0;
778 struct socket
*s_estab
= NULL
;
782 nc
= rcu_dereference(connection
->net_conf
);
787 connect_int
= nc
->connect_int
;
790 timeo
= connect_int
* HZ
;
791 /* 28.5% random jitter */
792 timeo
+= (prandom_u32() & 1) ? timeo
/ 7 : -timeo
/ 7;
794 err
= wait_for_completion_interruptible_timeout(&ad
->door_bell
, timeo
);
798 err
= kernel_accept(ad
->s_listen
, &s_estab
, 0);
800 if (err
!= -EAGAIN
&& err
!= -EINTR
&& err
!= -ERESTARTSYS
) {
801 drbd_err(connection
, "accept failed, err = %d\n", err
);
802 conn_request_state(connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
807 unregister_state_change(s_estab
->sk
, ad
);
812 static int decode_header(struct drbd_connection
*, void *, struct packet_info
*);
814 static int send_first_packet(struct drbd_connection
*connection
, struct drbd_socket
*sock
,
815 enum drbd_packet cmd
)
817 if (!conn_prepare_command(connection
, sock
))
819 return conn_send_command(connection
, sock
, cmd
, 0, NULL
, 0);
822 static int receive_first_packet(struct drbd_connection
*connection
, struct socket
*sock
)
824 unsigned int header_size
= drbd_header_size(connection
);
825 struct packet_info pi
;
830 nc
= rcu_dereference(connection
->net_conf
);
835 sock
->sk
->sk_rcvtimeo
= nc
->ping_timeo
* 4 * HZ
/ 10;
838 err
= drbd_recv_short(sock
, connection
->data
.rbuf
, header_size
, 0);
839 if (err
!= header_size
) {
844 err
= decode_header(connection
, connection
->data
.rbuf
, &pi
);
851 * drbd_socket_okay() - Free the socket if its connection is not okay
852 * @sock: pointer to the pointer to the socket.
854 static bool drbd_socket_okay(struct socket
**sock
)
862 rr
= drbd_recv_short(*sock
, tb
, 4, MSG_DONTWAIT
| MSG_PEEK
);
864 if (rr
> 0 || rr
== -EAGAIN
) {
873 static bool connection_established(struct drbd_connection
*connection
,
874 struct socket
**sock1
,
875 struct socket
**sock2
)
881 if (!*sock1
|| !*sock2
)
885 nc
= rcu_dereference(connection
->net_conf
);
886 timeout
= (nc
->sock_check_timeo
?: nc
->ping_timeo
) * HZ
/ 10;
888 schedule_timeout_interruptible(timeout
);
890 ok
= drbd_socket_okay(sock1
);
891 ok
= drbd_socket_okay(sock2
) && ok
;
896 /* Gets called if a connection is established, or if a new minor gets created
898 int drbd_connected(struct drbd_peer_device
*peer_device
)
900 struct drbd_device
*device
= peer_device
->device
;
903 atomic_set(&device
->packet_seq
, 0);
904 device
->peer_seq
= 0;
906 device
->state_mutex
= peer_device
->connection
->agreed_pro_version
< 100 ?
907 &peer_device
->connection
->cstate_mutex
:
908 &device
->own_state_mutex
;
910 err
= drbd_send_sync_param(peer_device
);
912 err
= drbd_send_sizes(peer_device
, 0, 0);
914 err
= drbd_send_uuids(peer_device
);
916 err
= drbd_send_current_state(peer_device
);
917 clear_bit(USE_DEGR_WFC_T
, &device
->flags
);
918 clear_bit(RESIZE_PENDING
, &device
->flags
);
919 atomic_set(&device
->ap_in_flight
, 0);
920 mod_timer(&device
->request_timer
, jiffies
+ HZ
); /* just start it here. */
926 * 1 yes, we have a valid connection
927 * 0 oops, did not work out, please try again
928 * -1 peer talks different language,
929 * no point in trying again, please go standalone.
930 * -2 We do not have a network config...
932 static int conn_connect(struct drbd_connection
*connection
)
934 struct drbd_socket sock
, msock
;
935 struct drbd_peer_device
*peer_device
;
938 bool discard_my_data
, ok
;
939 enum drbd_state_rv rv
;
940 struct accept_wait_data ad
= {
941 .connection
= connection
,
942 .door_bell
= COMPLETION_INITIALIZER_ONSTACK(ad
.door_bell
),
945 clear_bit(DISCONNECT_SENT
, &connection
->flags
);
946 if (conn_request_state(connection
, NS(conn
, C_WF_CONNECTION
), CS_VERBOSE
) < SS_SUCCESS
)
949 mutex_init(&sock
.mutex
);
950 sock
.sbuf
= connection
->data
.sbuf
;
951 sock
.rbuf
= connection
->data
.rbuf
;
953 mutex_init(&msock
.mutex
);
954 msock
.sbuf
= connection
->meta
.sbuf
;
955 msock
.rbuf
= connection
->meta
.rbuf
;
958 /* Assume that the peer only understands protocol 80 until we know better. */
959 connection
->agreed_pro_version
= 80;
961 if (prepare_listen_socket(connection
, &ad
))
967 s
= drbd_try_connect(connection
);
971 send_first_packet(connection
, &sock
, P_INITIAL_DATA
);
972 } else if (!msock
.socket
) {
973 clear_bit(RESOLVE_CONFLICTS
, &connection
->flags
);
975 send_first_packet(connection
, &msock
, P_INITIAL_META
);
977 drbd_err(connection
, "Logic error in conn_connect()\n");
978 goto out_release_sockets
;
982 if (connection_established(connection
, &sock
.socket
, &msock
.socket
))
986 s
= drbd_wait_for_connect(connection
, &ad
);
988 int fp
= receive_first_packet(connection
, s
);
989 drbd_socket_okay(&sock
.socket
);
990 drbd_socket_okay(&msock
.socket
);
994 drbd_warn(connection
, "initial packet S crossed\n");
995 sock_release(sock
.socket
);
1001 case P_INITIAL_META
:
1002 set_bit(RESOLVE_CONFLICTS
, &connection
->flags
);
1004 drbd_warn(connection
, "initial packet M crossed\n");
1005 sock_release(msock
.socket
);
1012 drbd_warn(connection
, "Error receiving initial packet\n");
1015 if (prandom_u32() & 1)
1020 if (connection
->cstate
<= C_DISCONNECTING
)
1021 goto out_release_sockets
;
1022 if (signal_pending(current
)) {
1023 flush_signals(current
);
1025 if (get_t_state(&connection
->receiver
) == EXITING
)
1026 goto out_release_sockets
;
1029 ok
= connection_established(connection
, &sock
.socket
, &msock
.socket
);
1033 sock_release(ad
.s_listen
);
1035 sock
.socket
->sk
->sk_reuse
= SK_CAN_REUSE
; /* SO_REUSEADDR */
1036 msock
.socket
->sk
->sk_reuse
= SK_CAN_REUSE
; /* SO_REUSEADDR */
1038 sock
.socket
->sk
->sk_allocation
= GFP_NOIO
;
1039 msock
.socket
->sk
->sk_allocation
= GFP_NOIO
;
1041 sock
.socket
->sk
->sk_priority
= TC_PRIO_INTERACTIVE_BULK
;
1042 msock
.socket
->sk
->sk_priority
= TC_PRIO_INTERACTIVE
;
1045 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1046 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1047 * first set it to the P_CONNECTION_FEATURES timeout,
1048 * which we set to 4x the configured ping_timeout. */
1050 nc
= rcu_dereference(connection
->net_conf
);
1052 sock
.socket
->sk
->sk_sndtimeo
=
1053 sock
.socket
->sk
->sk_rcvtimeo
= nc
->ping_timeo
*4*HZ
/10;
1055 msock
.socket
->sk
->sk_rcvtimeo
= nc
->ping_int
*HZ
;
1056 timeout
= nc
->timeout
* HZ
/ 10;
1057 discard_my_data
= nc
->discard_my_data
;
1060 msock
.socket
->sk
->sk_sndtimeo
= timeout
;
1062 /* we don't want delays.
1063 * we use TCP_CORK where appropriate, though */
1064 drbd_tcp_nodelay(sock
.socket
);
1065 drbd_tcp_nodelay(msock
.socket
);
1067 connection
->data
.socket
= sock
.socket
;
1068 connection
->meta
.socket
= msock
.socket
;
1069 connection
->last_received
= jiffies
;
1071 h
= drbd_do_features(connection
);
1075 if (connection
->cram_hmac_tfm
) {
1076 /* drbd_request_state(device, NS(conn, WFAuth)); */
1077 switch (drbd_do_auth(connection
)) {
1079 drbd_err(connection
, "Authentication of peer failed\n");
1082 drbd_err(connection
, "Authentication of peer failed, trying again.\n");
1087 connection
->data
.socket
->sk
->sk_sndtimeo
= timeout
;
1088 connection
->data
.socket
->sk
->sk_rcvtimeo
= MAX_SCHEDULE_TIMEOUT
;
1090 if (drbd_send_protocol(connection
) == -EOPNOTSUPP
)
1093 /* Prevent a race between resync-handshake and
1094 * being promoted to Primary.
1096 * Grab and release the state mutex, so we know that any current
1097 * drbd_set_role() is finished, and any incoming drbd_set_role
1098 * will see the STATE_SENT flag, and wait for it to be cleared.
1100 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
)
1101 mutex_lock(peer_device
->device
->state_mutex
);
1103 set_bit(STATE_SENT
, &connection
->flags
);
1105 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
)
1106 mutex_unlock(peer_device
->device
->state_mutex
);
1109 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
) {
1110 struct drbd_device
*device
= peer_device
->device
;
1111 kref_get(&device
->kref
);
1114 if (discard_my_data
)
1115 set_bit(DISCARD_MY_DATA
, &device
->flags
);
1117 clear_bit(DISCARD_MY_DATA
, &device
->flags
);
1119 drbd_connected(peer_device
);
1120 kref_put(&device
->kref
, drbd_destroy_device
);
1125 rv
= conn_request_state(connection
, NS(conn
, C_WF_REPORT_PARAMS
), CS_VERBOSE
);
1126 if (rv
< SS_SUCCESS
|| connection
->cstate
!= C_WF_REPORT_PARAMS
) {
1127 clear_bit(STATE_SENT
, &connection
->flags
);
1131 drbd_thread_start(&connection
->ack_receiver
);
1132 /* opencoded create_singlethread_workqueue(),
1133 * to be able to use format string arguments */
1134 connection
->ack_sender
=
1135 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM
, connection
->resource
->name
);
1136 if (!connection
->ack_sender
) {
1137 drbd_err(connection
, "Failed to create workqueue ack_sender\n");
1141 mutex_lock(&connection
->resource
->conf_update
);
1142 /* The discard_my_data flag is a single-shot modifier to the next
1143 * connection attempt, the handshake of which is now well underway.
1144 * No need for rcu style copying of the whole struct
1145 * just to clear a single value. */
1146 connection
->net_conf
->discard_my_data
= 0;
1147 mutex_unlock(&connection
->resource
->conf_update
);
1151 out_release_sockets
:
1153 sock_release(ad
.s_listen
);
1155 sock_release(sock
.socket
);
1157 sock_release(msock
.socket
);
1161 static int decode_header(struct drbd_connection
*connection
, void *header
, struct packet_info
*pi
)
1163 unsigned int header_size
= drbd_header_size(connection
);
1165 if (header_size
== sizeof(struct p_header100
) &&
1166 *(__be32
*)header
== cpu_to_be32(DRBD_MAGIC_100
)) {
1167 struct p_header100
*h
= header
;
1169 drbd_err(connection
, "Header padding is not zero\n");
1172 pi
->vnr
= be16_to_cpu(h
->volume
);
1173 pi
->cmd
= be16_to_cpu(h
->command
);
1174 pi
->size
= be32_to_cpu(h
->length
);
1175 } else if (header_size
== sizeof(struct p_header95
) &&
1176 *(__be16
*)header
== cpu_to_be16(DRBD_MAGIC_BIG
)) {
1177 struct p_header95
*h
= header
;
1178 pi
->cmd
= be16_to_cpu(h
->command
);
1179 pi
->size
= be32_to_cpu(h
->length
);
1181 } else if (header_size
== sizeof(struct p_header80
) &&
1182 *(__be32
*)header
== cpu_to_be32(DRBD_MAGIC
)) {
1183 struct p_header80
*h
= header
;
1184 pi
->cmd
= be16_to_cpu(h
->command
);
1185 pi
->size
= be16_to_cpu(h
->length
);
1188 drbd_err(connection
, "Wrong magic value 0x%08x in protocol version %d\n",
1189 be32_to_cpu(*(__be32
*)header
),
1190 connection
->agreed_pro_version
);
1193 pi
->data
= header
+ header_size
;
1197 static int drbd_recv_header(struct drbd_connection
*connection
, struct packet_info
*pi
)
1199 void *buffer
= connection
->data
.rbuf
;
1202 err
= drbd_recv_all_warn(connection
, buffer
, drbd_header_size(connection
));
1206 err
= decode_header(connection
, buffer
, pi
);
1207 connection
->last_received
= jiffies
;
1212 /* This is blkdev_issue_flush, but asynchronous.
1213 * We want to submit to all component volumes in parallel,
1214 * then wait for all completions.
1216 struct issue_flush_context
{
1219 struct completion done
;
1221 struct one_flush_context
{
1222 struct drbd_device
*device
;
1223 struct issue_flush_context
*ctx
;
1226 void one_flush_endio(struct bio
*bio
)
1228 struct one_flush_context
*octx
= bio
->bi_private
;
1229 struct drbd_device
*device
= octx
->device
;
1230 struct issue_flush_context
*ctx
= octx
->ctx
;
1232 if (bio
->bi_status
) {
1233 ctx
->error
= blk_status_to_errno(bio
->bi_status
);
1234 drbd_info(device
, "local disk FLUSH FAILED with status %d\n", bio
->bi_status
);
1239 clear_bit(FLUSH_PENDING
, &device
->flags
);
1241 kref_put(&device
->kref
, drbd_destroy_device
);
1243 if (atomic_dec_and_test(&ctx
->pending
))
1244 complete(&ctx
->done
);
1247 static void submit_one_flush(struct drbd_device
*device
, struct issue_flush_context
*ctx
)
1249 struct bio
*bio
= bio_alloc(GFP_NOIO
, 0);
1250 struct one_flush_context
*octx
= kmalloc(sizeof(*octx
), GFP_NOIO
);
1251 if (!bio
|| !octx
) {
1252 drbd_warn(device
, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1253 /* FIXME: what else can I do now? disconnecting or detaching
1254 * really does not help to improve the state of the world, either.
1260 ctx
->error
= -ENOMEM
;
1262 kref_put(&device
->kref
, drbd_destroy_device
);
1266 octx
->device
= device
;
1268 bio
->bi_bdev
= device
->ldev
->backing_bdev
;
1269 bio
->bi_private
= octx
;
1270 bio
->bi_end_io
= one_flush_endio
;
1271 bio
->bi_opf
= REQ_OP_FLUSH
| REQ_PREFLUSH
;
1273 device
->flush_jif
= jiffies
;
1274 set_bit(FLUSH_PENDING
, &device
->flags
);
1275 atomic_inc(&ctx
->pending
);
1279 static void drbd_flush(struct drbd_connection
*connection
)
1281 if (connection
->resource
->write_ordering
>= WO_BDEV_FLUSH
) {
1282 struct drbd_peer_device
*peer_device
;
1283 struct issue_flush_context ctx
;
1286 atomic_set(&ctx
.pending
, 1);
1288 init_completion(&ctx
.done
);
1291 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
) {
1292 struct drbd_device
*device
= peer_device
->device
;
1294 if (!get_ldev(device
))
1296 kref_get(&device
->kref
);
1299 submit_one_flush(device
, &ctx
);
1305 /* Do we want to add a timeout,
1306 * if disk-timeout is set? */
1307 if (!atomic_dec_and_test(&ctx
.pending
))
1308 wait_for_completion(&ctx
.done
);
1311 /* would rather check on EOPNOTSUPP, but that is not reliable.
1312 * don't try again for ANY return value != 0
1313 * if (rv == -EOPNOTSUPP) */
1314 /* Any error is already reported by bio_endio callback. */
1315 drbd_bump_write_ordering(connection
->resource
, NULL
, WO_DRAIN_IO
);
1321 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1322 * @device: DRBD device.
1323 * @epoch: Epoch object.
1326 static enum finish_epoch
drbd_may_finish_epoch(struct drbd_connection
*connection
,
1327 struct drbd_epoch
*epoch
,
1328 enum epoch_event ev
)
1331 struct drbd_epoch
*next_epoch
;
1332 enum finish_epoch rv
= FE_STILL_LIVE
;
1334 spin_lock(&connection
->epoch_lock
);
1338 epoch_size
= atomic_read(&epoch
->epoch_size
);
1340 switch (ev
& ~EV_CLEANUP
) {
1342 atomic_dec(&epoch
->active
);
1344 case EV_GOT_BARRIER_NR
:
1345 set_bit(DE_HAVE_BARRIER_NUMBER
, &epoch
->flags
);
1347 case EV_BECAME_LAST
:
1352 if (epoch_size
!= 0 &&
1353 atomic_read(&epoch
->active
) == 0 &&
1354 (test_bit(DE_HAVE_BARRIER_NUMBER
, &epoch
->flags
) || ev
& EV_CLEANUP
)) {
1355 if (!(ev
& EV_CLEANUP
)) {
1356 spin_unlock(&connection
->epoch_lock
);
1357 drbd_send_b_ack(epoch
->connection
, epoch
->barrier_nr
, epoch_size
);
1358 spin_lock(&connection
->epoch_lock
);
1361 /* FIXME: dec unacked on connection, once we have
1362 * something to count pending connection packets in. */
1363 if (test_bit(DE_HAVE_BARRIER_NUMBER
, &epoch
->flags
))
1364 dec_unacked(epoch
->connection
);
1367 if (connection
->current_epoch
!= epoch
) {
1368 next_epoch
= list_entry(epoch
->list
.next
, struct drbd_epoch
, list
);
1369 list_del(&epoch
->list
);
1370 ev
= EV_BECAME_LAST
| (ev
& EV_CLEANUP
);
1371 connection
->epochs
--;
1374 if (rv
== FE_STILL_LIVE
)
1378 atomic_set(&epoch
->epoch_size
, 0);
1379 /* atomic_set(&epoch->active, 0); is already zero */
1380 if (rv
== FE_STILL_LIVE
)
1391 spin_unlock(&connection
->epoch_lock
);
1396 static enum write_ordering_e
1397 max_allowed_wo(struct drbd_backing_dev
*bdev
, enum write_ordering_e wo
)
1399 struct disk_conf
*dc
;
1401 dc
= rcu_dereference(bdev
->disk_conf
);
1403 if (wo
== WO_BDEV_FLUSH
&& !dc
->disk_flushes
)
1405 if (wo
== WO_DRAIN_IO
&& !dc
->disk_drain
)
1412 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1413 * @connection: DRBD connection.
1414 * @wo: Write ordering method to try.
1416 void drbd_bump_write_ordering(struct drbd_resource
*resource
, struct drbd_backing_dev
*bdev
,
1417 enum write_ordering_e wo
)
1419 struct drbd_device
*device
;
1420 enum write_ordering_e pwo
;
1422 static char *write_ordering_str
[] = {
1424 [WO_DRAIN_IO
] = "drain",
1425 [WO_BDEV_FLUSH
] = "flush",
1428 pwo
= resource
->write_ordering
;
1429 if (wo
!= WO_BDEV_FLUSH
)
1432 idr_for_each_entry(&resource
->devices
, device
, vnr
) {
1433 if (get_ldev(device
)) {
1434 wo
= max_allowed_wo(device
->ldev
, wo
);
1435 if (device
->ldev
== bdev
)
1442 wo
= max_allowed_wo(bdev
, wo
);
1446 resource
->write_ordering
= wo
;
1447 if (pwo
!= resource
->write_ordering
|| wo
== WO_BDEV_FLUSH
)
1448 drbd_info(resource
, "Method to ensure write ordering: %s\n", write_ordering_str
[resource
->write_ordering
]);
1451 static void drbd_issue_peer_discard(struct drbd_device
*device
, struct drbd_peer_request
*peer_req
)
1453 struct block_device
*bdev
= device
->ldev
->backing_bdev
;
1455 if (blkdev_issue_zeroout(bdev
, peer_req
->i
.sector
, peer_req
->i
.size
>> 9,
1457 peer_req
->flags
|= EE_WAS_ERROR
;
1459 drbd_endio_write_sec_final(peer_req
);
1462 static void drbd_issue_peer_wsame(struct drbd_device
*device
,
1463 struct drbd_peer_request
*peer_req
)
1465 struct block_device
*bdev
= device
->ldev
->backing_bdev
;
1466 sector_t s
= peer_req
->i
.sector
;
1467 sector_t nr
= peer_req
->i
.size
>> 9;
1468 if (blkdev_issue_write_same(bdev
, s
, nr
, GFP_NOIO
, peer_req
->pages
))
1469 peer_req
->flags
|= EE_WAS_ERROR
;
1470 drbd_endio_write_sec_final(peer_req
);
1475 * drbd_submit_peer_request()
1476 * @device: DRBD device.
1477 * @peer_req: peer request
1478 * @rw: flag field, see bio->bi_opf
1480 * May spread the pages to multiple bios,
1481 * depending on bio_add_page restrictions.
1483 * Returns 0 if all bios have been submitted,
1484 * -ENOMEM if we could not allocate enough bios,
1485 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1486 * single page to an empty bio (which should never happen and likely indicates
1487 * that the lower level IO stack is in some way broken). This has been observed
1488 * on certain Xen deployments.
1490 /* TODO allocate from our own bio_set. */
1491 int drbd_submit_peer_request(struct drbd_device
*device
,
1492 struct drbd_peer_request
*peer_req
,
1493 const unsigned op
, const unsigned op_flags
,
1494 const int fault_type
)
1496 struct bio
*bios
= NULL
;
1498 struct page
*page
= peer_req
->pages
;
1499 sector_t sector
= peer_req
->i
.sector
;
1500 unsigned data_size
= peer_req
->i
.size
;
1501 unsigned n_bios
= 0;
1502 unsigned nr_pages
= (data_size
+ PAGE_SIZE
-1) >> PAGE_SHIFT
;
1505 /* TRIM/DISCARD: for now, always use the helper function
1506 * blkdev_issue_zeroout(..., discard=true).
1507 * It's synchronous, but it does the right thing wrt. bio splitting.
1508 * Correctness first, performance later. Next step is to code an
1509 * asynchronous variant of the same.
1511 if (peer_req
->flags
& (EE_IS_TRIM
|EE_WRITE_SAME
)) {
1512 /* wait for all pending IO completions, before we start
1513 * zeroing things out. */
1514 conn_wait_active_ee_empty(peer_req
->peer_device
->connection
);
1515 /* add it to the active list now,
1516 * so we can find it to present it in debugfs */
1517 peer_req
->submit_jif
= jiffies
;
1518 peer_req
->flags
|= EE_SUBMITTED
;
1520 /* If this was a resync request from receive_rs_deallocated(),
1521 * it is already on the sync_ee list */
1522 if (list_empty(&peer_req
->w
.list
)) {
1523 spin_lock_irq(&device
->resource
->req_lock
);
1524 list_add_tail(&peer_req
->w
.list
, &device
->active_ee
);
1525 spin_unlock_irq(&device
->resource
->req_lock
);
1528 if (peer_req
->flags
& EE_IS_TRIM
)
1529 drbd_issue_peer_discard(device
, peer_req
);
1530 else /* EE_WRITE_SAME */
1531 drbd_issue_peer_wsame(device
, peer_req
);
1535 /* In most cases, we will only need one bio. But in case the lower
1536 * level restrictions happen to be different at this offset on this
1537 * side than those of the sending peer, we may need to submit the
1538 * request in more than one bio.
1540 * Plain bio_alloc is good enough here, this is no DRBD internally
1541 * generated bio, but a bio allocated on behalf of the peer.
1544 bio
= bio_alloc(GFP_NOIO
, nr_pages
);
1546 drbd_err(device
, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages
);
1549 /* > peer_req->i.sector, unless this is the first bio */
1550 bio
->bi_iter
.bi_sector
= sector
;
1551 bio
->bi_bdev
= device
->ldev
->backing_bdev
;
1552 bio_set_op_attrs(bio
, op
, op_flags
);
1553 bio
->bi_private
= peer_req
;
1554 bio
->bi_end_io
= drbd_peer_request_endio
;
1556 bio
->bi_next
= bios
;
1560 page_chain_for_each(page
) {
1561 unsigned len
= min_t(unsigned, data_size
, PAGE_SIZE
);
1562 if (!bio_add_page(bio
, page
, len
, 0))
1568 D_ASSERT(device
, data_size
== 0);
1569 D_ASSERT(device
, page
== NULL
);
1571 atomic_set(&peer_req
->pending_bios
, n_bios
);
1572 /* for debugfs: update timestamp, mark as submitted */
1573 peer_req
->submit_jif
= jiffies
;
1574 peer_req
->flags
|= EE_SUBMITTED
;
1577 bios
= bios
->bi_next
;
1578 bio
->bi_next
= NULL
;
1580 drbd_generic_make_request(device
, fault_type
, bio
);
1587 bios
= bios
->bi_next
;
1593 static void drbd_remove_epoch_entry_interval(struct drbd_device
*device
,
1594 struct drbd_peer_request
*peer_req
)
1596 struct drbd_interval
*i
= &peer_req
->i
;
1598 drbd_remove_interval(&device
->write_requests
, i
);
1599 drbd_clear_interval(i
);
1601 /* Wake up any processes waiting for this peer request to complete. */
1603 wake_up(&device
->misc_wait
);
1606 static void conn_wait_active_ee_empty(struct drbd_connection
*connection
)
1608 struct drbd_peer_device
*peer_device
;
1612 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
) {
1613 struct drbd_device
*device
= peer_device
->device
;
1615 kref_get(&device
->kref
);
1617 drbd_wait_ee_list_empty(device
, &device
->active_ee
);
1618 kref_put(&device
->kref
, drbd_destroy_device
);
1624 static int receive_Barrier(struct drbd_connection
*connection
, struct packet_info
*pi
)
1627 struct p_barrier
*p
= pi
->data
;
1628 struct drbd_epoch
*epoch
;
1630 /* FIXME these are unacked on connection,
1631 * not a specific (peer)device.
1633 connection
->current_epoch
->barrier_nr
= p
->barrier
;
1634 connection
->current_epoch
->connection
= connection
;
1635 rv
= drbd_may_finish_epoch(connection
, connection
->current_epoch
, EV_GOT_BARRIER_NR
);
1637 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1638 * the activity log, which means it would not be resynced in case the
1639 * R_PRIMARY crashes now.
1640 * Therefore we must send the barrier_ack after the barrier request was
1642 switch (connection
->resource
->write_ordering
) {
1644 if (rv
== FE_RECYCLED
)
1647 /* receiver context, in the writeout path of the other node.
1648 * avoid potential distributed deadlock */
1649 epoch
= kmalloc(sizeof(struct drbd_epoch
), GFP_NOIO
);
1653 drbd_warn(connection
, "Allocation of an epoch failed, slowing down\n");
1658 conn_wait_active_ee_empty(connection
);
1659 drbd_flush(connection
);
1661 if (atomic_read(&connection
->current_epoch
->epoch_size
)) {
1662 epoch
= kmalloc(sizeof(struct drbd_epoch
), GFP_NOIO
);
1669 drbd_err(connection
, "Strangeness in connection->write_ordering %d\n",
1670 connection
->resource
->write_ordering
);
1675 atomic_set(&epoch
->epoch_size
, 0);
1676 atomic_set(&epoch
->active
, 0);
1678 spin_lock(&connection
->epoch_lock
);
1679 if (atomic_read(&connection
->current_epoch
->epoch_size
)) {
1680 list_add(&epoch
->list
, &connection
->current_epoch
->list
);
1681 connection
->current_epoch
= epoch
;
1682 connection
->epochs
++;
1684 /* The current_epoch got recycled while we allocated this one... */
1687 spin_unlock(&connection
->epoch_lock
);
1692 /* quick wrapper in case payload size != request_size (write same) */
1693 static void drbd_csum_ee_size(struct crypto_ahash
*h
,
1694 struct drbd_peer_request
*r
, void *d
,
1695 unsigned int payload_size
)
1697 unsigned int tmp
= r
->i
.size
;
1698 r
->i
.size
= payload_size
;
1699 drbd_csum_ee(h
, r
, d
);
1703 /* used from receive_RSDataReply (recv_resync_read)
1704 * and from receive_Data.
1705 * data_size: actual payload ("data in")
1706 * for normal writes that is bi_size.
1707 * for discards, that is zero.
1708 * for write same, it is logical_block_size.
1709 * both trim and write same have the bi_size ("data len to be affected")
1710 * as extra argument in the packet header.
1712 static struct drbd_peer_request
*
1713 read_in_block(struct drbd_peer_device
*peer_device
, u64 id
, sector_t sector
,
1714 struct packet_info
*pi
) __must_hold(local
)
1716 struct drbd_device
*device
= peer_device
->device
;
1717 const sector_t capacity
= drbd_get_capacity(device
->this_bdev
);
1718 struct drbd_peer_request
*peer_req
;
1720 int digest_size
, err
;
1721 unsigned int data_size
= pi
->size
, ds
;
1722 void *dig_in
= peer_device
->connection
->int_dig_in
;
1723 void *dig_vv
= peer_device
->connection
->int_dig_vv
;
1724 unsigned long *data
;
1725 struct p_trim
*trim
= (pi
->cmd
== P_TRIM
) ? pi
->data
: NULL
;
1726 struct p_trim
*wsame
= (pi
->cmd
== P_WSAME
) ? pi
->data
: NULL
;
1729 if (!trim
&& peer_device
->connection
->peer_integrity_tfm
) {
1730 digest_size
= crypto_ahash_digestsize(peer_device
->connection
->peer_integrity_tfm
);
1732 * FIXME: Receive the incoming digest into the receive buffer
1733 * here, together with its struct p_data?
1735 err
= drbd_recv_all_warn(peer_device
->connection
, dig_in
, digest_size
);
1738 data_size
-= digest_size
;
1741 /* assume request_size == data_size, but special case trim and wsame. */
1744 if (!expect(data_size
== 0))
1746 ds
= be32_to_cpu(trim
->size
);
1748 if (data_size
!= queue_logical_block_size(device
->rq_queue
)) {
1749 drbd_err(peer_device
, "data size (%u) != drbd logical block size (%u)\n",
1750 data_size
, queue_logical_block_size(device
->rq_queue
));
1753 if (data_size
!= bdev_logical_block_size(device
->ldev
->backing_bdev
)) {
1754 drbd_err(peer_device
, "data size (%u) != backend logical block size (%u)\n",
1755 data_size
, bdev_logical_block_size(device
->ldev
->backing_bdev
));
1758 ds
= be32_to_cpu(wsame
->size
);
1761 if (!expect(IS_ALIGNED(ds
, 512)))
1763 if (trim
|| wsame
) {
1764 if (!expect(ds
<= (DRBD_MAX_BBIO_SECTORS
<< 9)))
1766 } else if (!expect(ds
<= DRBD_MAX_BIO_SIZE
))
1769 /* even though we trust out peer,
1770 * we sometimes have to double check. */
1771 if (sector
+ (ds
>>9) > capacity
) {
1772 drbd_err(device
, "request from peer beyond end of local disk: "
1773 "capacity: %llus < sector: %llus + size: %u\n",
1774 (unsigned long long)capacity
,
1775 (unsigned long long)sector
, ds
);
1779 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1780 * "criss-cross" setup, that might cause write-out on some other DRBD,
1781 * which in turn might block on the other node at this very place. */
1782 peer_req
= drbd_alloc_peer_req(peer_device
, id
, sector
, ds
, data_size
, GFP_NOIO
);
1786 peer_req
->flags
|= EE_WRITE
;
1788 peer_req
->flags
|= EE_IS_TRIM
;
1792 peer_req
->flags
|= EE_WRITE_SAME
;
1794 /* receive payload size bytes into page chain */
1796 page
= peer_req
->pages
;
1797 page_chain_for_each(page
) {
1798 unsigned len
= min_t(int, ds
, PAGE_SIZE
);
1800 err
= drbd_recv_all_warn(peer_device
->connection
, data
, len
);
1801 if (drbd_insert_fault(device
, DRBD_FAULT_RECEIVE
)) {
1802 drbd_err(device
, "Fault injection: Corrupting data on receive\n");
1803 data
[0] = data
[0] ^ (unsigned long)-1;
1807 drbd_free_peer_req(device
, peer_req
);
1814 drbd_csum_ee_size(peer_device
->connection
->peer_integrity_tfm
, peer_req
, dig_vv
, data_size
);
1815 if (memcmp(dig_in
, dig_vv
, digest_size
)) {
1816 drbd_err(device
, "Digest integrity check FAILED: %llus +%u\n",
1817 (unsigned long long)sector
, data_size
);
1818 drbd_free_peer_req(device
, peer_req
);
1822 device
->recv_cnt
+= data_size
>> 9;
1826 /* drbd_drain_block() just takes a data block
1827 * out of the socket input buffer, and discards it.
1829 static int drbd_drain_block(struct drbd_peer_device
*peer_device
, int data_size
)
1838 page
= drbd_alloc_pages(peer_device
, 1, 1);
1842 unsigned int len
= min_t(int, data_size
, PAGE_SIZE
);
1844 err
= drbd_recv_all_warn(peer_device
->connection
, data
, len
);
1850 drbd_free_pages(peer_device
->device
, page
, 0);
1854 static int recv_dless_read(struct drbd_peer_device
*peer_device
, struct drbd_request
*req
,
1855 sector_t sector
, int data_size
)
1857 struct bio_vec bvec
;
1858 struct bvec_iter iter
;
1860 int digest_size
, err
, expect
;
1861 void *dig_in
= peer_device
->connection
->int_dig_in
;
1862 void *dig_vv
= peer_device
->connection
->int_dig_vv
;
1865 if (peer_device
->connection
->peer_integrity_tfm
) {
1866 digest_size
= crypto_ahash_digestsize(peer_device
->connection
->peer_integrity_tfm
);
1867 err
= drbd_recv_all_warn(peer_device
->connection
, dig_in
, digest_size
);
1870 data_size
-= digest_size
;
1873 /* optimistically update recv_cnt. if receiving fails below,
1874 * we disconnect anyways, and counters will be reset. */
1875 peer_device
->device
->recv_cnt
+= data_size
>>9;
1877 bio
= req
->master_bio
;
1878 D_ASSERT(peer_device
->device
, sector
== bio
->bi_iter
.bi_sector
);
1880 bio_for_each_segment(bvec
, bio
, iter
) {
1881 void *mapped
= kmap(bvec
.bv_page
) + bvec
.bv_offset
;
1882 expect
= min_t(int, data_size
, bvec
.bv_len
);
1883 err
= drbd_recv_all_warn(peer_device
->connection
, mapped
, expect
);
1884 kunmap(bvec
.bv_page
);
1887 data_size
-= expect
;
1891 drbd_csum_bio(peer_device
->connection
->peer_integrity_tfm
, bio
, dig_vv
);
1892 if (memcmp(dig_in
, dig_vv
, digest_size
)) {
1893 drbd_err(peer_device
, "Digest integrity check FAILED. Broken NICs?\n");
1898 D_ASSERT(peer_device
->device
, data_size
== 0);
1903 * e_end_resync_block() is called in ack_sender context via
1904 * drbd_finish_peer_reqs().
1906 static int e_end_resync_block(struct drbd_work
*w
, int unused
)
1908 struct drbd_peer_request
*peer_req
=
1909 container_of(w
, struct drbd_peer_request
, w
);
1910 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
1911 struct drbd_device
*device
= peer_device
->device
;
1912 sector_t sector
= peer_req
->i
.sector
;
1915 D_ASSERT(device
, drbd_interval_empty(&peer_req
->i
));
1917 if (likely((peer_req
->flags
& EE_WAS_ERROR
) == 0)) {
1918 drbd_set_in_sync(device
, sector
, peer_req
->i
.size
);
1919 err
= drbd_send_ack(peer_device
, P_RS_WRITE_ACK
, peer_req
);
1921 /* Record failure to sync */
1922 drbd_rs_failed_io(device
, sector
, peer_req
->i
.size
);
1924 err
= drbd_send_ack(peer_device
, P_NEG_ACK
, peer_req
);
1926 dec_unacked(device
);
1931 static int recv_resync_read(struct drbd_peer_device
*peer_device
, sector_t sector
,
1932 struct packet_info
*pi
) __releases(local
)
1934 struct drbd_device
*device
= peer_device
->device
;
1935 struct drbd_peer_request
*peer_req
;
1937 peer_req
= read_in_block(peer_device
, ID_SYNCER
, sector
, pi
);
1941 dec_rs_pending(device
);
1943 inc_unacked(device
);
1944 /* corresponding dec_unacked() in e_end_resync_block()
1945 * respective _drbd_clear_done_ee */
1947 peer_req
->w
.cb
= e_end_resync_block
;
1948 peer_req
->submit_jif
= jiffies
;
1950 spin_lock_irq(&device
->resource
->req_lock
);
1951 list_add_tail(&peer_req
->w
.list
, &device
->sync_ee
);
1952 spin_unlock_irq(&device
->resource
->req_lock
);
1954 atomic_add(pi
->size
>> 9, &device
->rs_sect_ev
);
1955 if (drbd_submit_peer_request(device
, peer_req
, REQ_OP_WRITE
, 0,
1956 DRBD_FAULT_RS_WR
) == 0)
1959 /* don't care for the reason here */
1960 drbd_err(device
, "submit failed, triggering re-connect\n");
1961 spin_lock_irq(&device
->resource
->req_lock
);
1962 list_del(&peer_req
->w
.list
);
1963 spin_unlock_irq(&device
->resource
->req_lock
);
1965 drbd_free_peer_req(device
, peer_req
);
1971 static struct drbd_request
*
1972 find_request(struct drbd_device
*device
, struct rb_root
*root
, u64 id
,
1973 sector_t sector
, bool missing_ok
, const char *func
)
1975 struct drbd_request
*req
;
1977 /* Request object according to our peer */
1978 req
= (struct drbd_request
*)(unsigned long)id
;
1979 if (drbd_contains_interval(root
, sector
, &req
->i
) && req
->i
.local
)
1982 drbd_err(device
, "%s: failed to find request 0x%lx, sector %llus\n", func
,
1983 (unsigned long)id
, (unsigned long long)sector
);
1988 static int receive_DataReply(struct drbd_connection
*connection
, struct packet_info
*pi
)
1990 struct drbd_peer_device
*peer_device
;
1991 struct drbd_device
*device
;
1992 struct drbd_request
*req
;
1995 struct p_data
*p
= pi
->data
;
1997 peer_device
= conn_peer_device(connection
, pi
->vnr
);
2000 device
= peer_device
->device
;
2002 sector
= be64_to_cpu(p
->sector
);
2004 spin_lock_irq(&device
->resource
->req_lock
);
2005 req
= find_request(device
, &device
->read_requests
, p
->block_id
, sector
, false, __func__
);
2006 spin_unlock_irq(&device
->resource
->req_lock
);
2010 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2011 * special casing it there for the various failure cases.
2012 * still no race with drbd_fail_pending_reads */
2013 err
= recv_dless_read(peer_device
, req
, sector
, pi
->size
);
2015 req_mod(req
, DATA_RECEIVED
);
2016 /* else: nothing. handled from drbd_disconnect...
2017 * I don't think we may complete this just yet
2018 * in case we are "on-disconnect: freeze" */
2023 static int receive_RSDataReply(struct drbd_connection
*connection
, struct packet_info
*pi
)
2025 struct drbd_peer_device
*peer_device
;
2026 struct drbd_device
*device
;
2029 struct p_data
*p
= pi
->data
;
2031 peer_device
= conn_peer_device(connection
, pi
->vnr
);
2034 device
= peer_device
->device
;
2036 sector
= be64_to_cpu(p
->sector
);
2037 D_ASSERT(device
, p
->block_id
== ID_SYNCER
);
2039 if (get_ldev(device
)) {
2040 /* data is submitted to disk within recv_resync_read.
2041 * corresponding put_ldev done below on error,
2042 * or in drbd_peer_request_endio. */
2043 err
= recv_resync_read(peer_device
, sector
, pi
);
2045 if (__ratelimit(&drbd_ratelimit_state
))
2046 drbd_err(device
, "Can not write resync data to local disk.\n");
2048 err
= drbd_drain_block(peer_device
, pi
->size
);
2050 drbd_send_ack_dp(peer_device
, P_NEG_ACK
, p
, pi
->size
);
2053 atomic_add(pi
->size
>> 9, &device
->rs_sect_in
);
2058 static void restart_conflicting_writes(struct drbd_device
*device
,
2059 sector_t sector
, int size
)
2061 struct drbd_interval
*i
;
2062 struct drbd_request
*req
;
2064 drbd_for_each_overlap(i
, &device
->write_requests
, sector
, size
) {
2067 req
= container_of(i
, struct drbd_request
, i
);
2068 if (req
->rq_state
& RQ_LOCAL_PENDING
||
2069 !(req
->rq_state
& RQ_POSTPONED
))
2071 /* as it is RQ_POSTPONED, this will cause it to
2072 * be queued on the retry workqueue. */
2073 __req_mod(req
, CONFLICT_RESOLVED
, NULL
);
2078 * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2080 static int e_end_block(struct drbd_work
*w
, int cancel
)
2082 struct drbd_peer_request
*peer_req
=
2083 container_of(w
, struct drbd_peer_request
, w
);
2084 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
2085 struct drbd_device
*device
= peer_device
->device
;
2086 sector_t sector
= peer_req
->i
.sector
;
2089 if (peer_req
->flags
& EE_SEND_WRITE_ACK
) {
2090 if (likely((peer_req
->flags
& EE_WAS_ERROR
) == 0)) {
2091 pcmd
= (device
->state
.conn
>= C_SYNC_SOURCE
&&
2092 device
->state
.conn
<= C_PAUSED_SYNC_T
&&
2093 peer_req
->flags
& EE_MAY_SET_IN_SYNC
) ?
2094 P_RS_WRITE_ACK
: P_WRITE_ACK
;
2095 err
= drbd_send_ack(peer_device
, pcmd
, peer_req
);
2096 if (pcmd
== P_RS_WRITE_ACK
)
2097 drbd_set_in_sync(device
, sector
, peer_req
->i
.size
);
2099 err
= drbd_send_ack(peer_device
, P_NEG_ACK
, peer_req
);
2100 /* we expect it to be marked out of sync anyways...
2101 * maybe assert this? */
2103 dec_unacked(device
);
2106 /* we delete from the conflict detection hash _after_ we sent out the
2107 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
2108 if (peer_req
->flags
& EE_IN_INTERVAL_TREE
) {
2109 spin_lock_irq(&device
->resource
->req_lock
);
2110 D_ASSERT(device
, !drbd_interval_empty(&peer_req
->i
));
2111 drbd_remove_epoch_entry_interval(device
, peer_req
);
2112 if (peer_req
->flags
& EE_RESTART_REQUESTS
)
2113 restart_conflicting_writes(device
, sector
, peer_req
->i
.size
);
2114 spin_unlock_irq(&device
->resource
->req_lock
);
2116 D_ASSERT(device
, drbd_interval_empty(&peer_req
->i
));
2118 drbd_may_finish_epoch(peer_device
->connection
, peer_req
->epoch
, EV_PUT
+ (cancel
? EV_CLEANUP
: 0));
2123 static int e_send_ack(struct drbd_work
*w
, enum drbd_packet ack
)
2125 struct drbd_peer_request
*peer_req
=
2126 container_of(w
, struct drbd_peer_request
, w
);
2127 struct drbd_peer_device
*peer_device
= peer_req
->peer_device
;
2130 err
= drbd_send_ack(peer_device
, ack
, peer_req
);
2131 dec_unacked(peer_device
->device
);
2136 static int e_send_superseded(struct drbd_work
*w
, int unused
)
2138 return e_send_ack(w
, P_SUPERSEDED
);
2141 static int e_send_retry_write(struct drbd_work
*w
, int unused
)
2143 struct drbd_peer_request
*peer_req
=
2144 container_of(w
, struct drbd_peer_request
, w
);
2145 struct drbd_connection
*connection
= peer_req
->peer_device
->connection
;
2147 return e_send_ack(w
, connection
->agreed_pro_version
>= 100 ?
2148 P_RETRY_WRITE
: P_SUPERSEDED
);
2151 static bool seq_greater(u32 a
, u32 b
)
2154 * We assume 32-bit wrap-around here.
2155 * For 24-bit wrap-around, we would have to shift:
2158 return (s32
)a
- (s32
)b
> 0;
2161 static u32
seq_max(u32 a
, u32 b
)
2163 return seq_greater(a
, b
) ? a
: b
;
2166 static void update_peer_seq(struct drbd_peer_device
*peer_device
, unsigned int peer_seq
)
2168 struct drbd_device
*device
= peer_device
->device
;
2169 unsigned int newest_peer_seq
;
2171 if (test_bit(RESOLVE_CONFLICTS
, &peer_device
->connection
->flags
)) {
2172 spin_lock(&device
->peer_seq_lock
);
2173 newest_peer_seq
= seq_max(device
->peer_seq
, peer_seq
);
2174 device
->peer_seq
= newest_peer_seq
;
2175 spin_unlock(&device
->peer_seq_lock
);
2176 /* wake up only if we actually changed device->peer_seq */
2177 if (peer_seq
== newest_peer_seq
)
2178 wake_up(&device
->seq_wait
);
2182 static inline int overlaps(sector_t s1
, int l1
, sector_t s2
, int l2
)
2184 return !((s1
+ (l1
>>9) <= s2
) || (s1
>= s2
+ (l2
>>9)));
2187 /* maybe change sync_ee into interval trees as well? */
2188 static bool overlapping_resync_write(struct drbd_device
*device
, struct drbd_peer_request
*peer_req
)
2190 struct drbd_peer_request
*rs_req
;
2193 spin_lock_irq(&device
->resource
->req_lock
);
2194 list_for_each_entry(rs_req
, &device
->sync_ee
, w
.list
) {
2195 if (overlaps(peer_req
->i
.sector
, peer_req
->i
.size
,
2196 rs_req
->i
.sector
, rs_req
->i
.size
)) {
2201 spin_unlock_irq(&device
->resource
->req_lock
);
2206 /* Called from receive_Data.
2207 * Synchronize packets on sock with packets on msock.
2209 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2210 * packet traveling on msock, they are still processed in the order they have
2213 * Note: we don't care for Ack packets overtaking P_DATA packets.
2215 * In case packet_seq is larger than device->peer_seq number, there are
2216 * outstanding packets on the msock. We wait for them to arrive.
2217 * In case we are the logically next packet, we update device->peer_seq
2218 * ourselves. Correctly handles 32bit wrap around.
2220 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2221 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2222 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2223 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2225 * returns 0 if we may process the packet,
2226 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2227 static int wait_for_and_update_peer_seq(struct drbd_peer_device
*peer_device
, const u32 peer_seq
)
2229 struct drbd_device
*device
= peer_device
->device
;
2234 if (!test_bit(RESOLVE_CONFLICTS
, &peer_device
->connection
->flags
))
2237 spin_lock(&device
->peer_seq_lock
);
2239 if (!seq_greater(peer_seq
- 1, device
->peer_seq
)) {
2240 device
->peer_seq
= seq_max(device
->peer_seq
, peer_seq
);
2244 if (signal_pending(current
)) {
2250 tp
= rcu_dereference(peer_device
->connection
->net_conf
)->two_primaries
;
2256 /* Only need to wait if two_primaries is enabled */
2257 prepare_to_wait(&device
->seq_wait
, &wait
, TASK_INTERRUPTIBLE
);
2258 spin_unlock(&device
->peer_seq_lock
);
2260 timeout
= rcu_dereference(peer_device
->connection
->net_conf
)->ping_timeo
*HZ
/10;
2262 timeout
= schedule_timeout(timeout
);
2263 spin_lock(&device
->peer_seq_lock
);
2266 drbd_err(device
, "Timed out waiting for missing ack packets; disconnecting\n");
2270 spin_unlock(&device
->peer_seq_lock
);
2271 finish_wait(&device
->seq_wait
, &wait
);
2275 /* see also bio_flags_to_wire()
2276 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2277 * flags and back. We may replicate to other kernel versions. */
2278 static unsigned long wire_flags_to_bio_flags(u32 dpf
)
2280 return (dpf
& DP_RW_SYNC
? REQ_SYNC
: 0) |
2281 (dpf
& DP_FUA
? REQ_FUA
: 0) |
2282 (dpf
& DP_FLUSH
? REQ_PREFLUSH
: 0);
2285 static unsigned long wire_flags_to_bio_op(u32 dpf
)
2287 if (dpf
& DP_DISCARD
)
2288 return REQ_OP_WRITE_ZEROES
;
2290 return REQ_OP_WRITE
;
2293 static void fail_postponed_requests(struct drbd_device
*device
, sector_t sector
,
2296 struct drbd_interval
*i
;
2299 drbd_for_each_overlap(i
, &device
->write_requests
, sector
, size
) {
2300 struct drbd_request
*req
;
2301 struct bio_and_error m
;
2305 req
= container_of(i
, struct drbd_request
, i
);
2306 if (!(req
->rq_state
& RQ_POSTPONED
))
2308 req
->rq_state
&= ~RQ_POSTPONED
;
2309 __req_mod(req
, NEG_ACKED
, &m
);
2310 spin_unlock_irq(&device
->resource
->req_lock
);
2312 complete_master_bio(device
, &m
);
2313 spin_lock_irq(&device
->resource
->req_lock
);
2318 static int handle_write_conflicts(struct drbd_device
*device
,
2319 struct drbd_peer_request
*peer_req
)
2321 struct drbd_connection
*connection
= peer_req
->peer_device
->connection
;
2322 bool resolve_conflicts
= test_bit(RESOLVE_CONFLICTS
, &connection
->flags
);
2323 sector_t sector
= peer_req
->i
.sector
;
2324 const unsigned int size
= peer_req
->i
.size
;
2325 struct drbd_interval
*i
;
2330 * Inserting the peer request into the write_requests tree will prevent
2331 * new conflicting local requests from being added.
2333 drbd_insert_interval(&device
->write_requests
, &peer_req
->i
);
2336 drbd_for_each_overlap(i
, &device
->write_requests
, sector
, size
) {
2337 if (i
== &peer_req
->i
)
2344 * Our peer has sent a conflicting remote request; this
2345 * should not happen in a two-node setup. Wait for the
2346 * earlier peer request to complete.
2348 err
= drbd_wait_misc(device
, i
);
2354 equal
= i
->sector
== sector
&& i
->size
== size
;
2355 if (resolve_conflicts
) {
2357 * If the peer request is fully contained within the
2358 * overlapping request, it can be considered overwritten
2359 * and thus superseded; otherwise, it will be retried
2360 * once all overlapping requests have completed.
2362 bool superseded
= i
->sector
<= sector
&& i
->sector
+
2363 (i
->size
>> 9) >= sector
+ (size
>> 9);
2366 drbd_alert(device
, "Concurrent writes detected: "
2367 "local=%llus +%u, remote=%llus +%u, "
2368 "assuming %s came first\n",
2369 (unsigned long long)i
->sector
, i
->size
,
2370 (unsigned long long)sector
, size
,
2371 superseded
? "local" : "remote");
2373 peer_req
->w
.cb
= superseded
? e_send_superseded
:
2375 list_add_tail(&peer_req
->w
.list
, &device
->done_ee
);
2376 queue_work(connection
->ack_sender
, &peer_req
->peer_device
->send_acks_work
);
2381 struct drbd_request
*req
=
2382 container_of(i
, struct drbd_request
, i
);
2385 drbd_alert(device
, "Concurrent writes detected: "
2386 "local=%llus +%u, remote=%llus +%u\n",
2387 (unsigned long long)i
->sector
, i
->size
,
2388 (unsigned long long)sector
, size
);
2390 if (req
->rq_state
& RQ_LOCAL_PENDING
||
2391 !(req
->rq_state
& RQ_POSTPONED
)) {
2393 * Wait for the node with the discard flag to
2394 * decide if this request has been superseded
2395 * or needs to be retried.
2396 * Requests that have been superseded will
2397 * disappear from the write_requests tree.
2399 * In addition, wait for the conflicting
2400 * request to finish locally before submitting
2401 * the conflicting peer request.
2403 err
= drbd_wait_misc(device
, &req
->i
);
2405 _conn_request_state(connection
, NS(conn
, C_TIMEOUT
), CS_HARD
);
2406 fail_postponed_requests(device
, sector
, size
);
2412 * Remember to restart the conflicting requests after
2413 * the new peer request has completed.
2415 peer_req
->flags
|= EE_RESTART_REQUESTS
;
2422 drbd_remove_epoch_entry_interval(device
, peer_req
);
2426 /* mirrored write */
2427 static int receive_Data(struct drbd_connection
*connection
, struct packet_info
*pi
)
2429 struct drbd_peer_device
*peer_device
;
2430 struct drbd_device
*device
;
2431 struct net_conf
*nc
;
2433 struct drbd_peer_request
*peer_req
;
2434 struct p_data
*p
= pi
->data
;
2435 u32 peer_seq
= be32_to_cpu(p
->seq_num
);
2440 peer_device
= conn_peer_device(connection
, pi
->vnr
);
2443 device
= peer_device
->device
;
2445 if (!get_ldev(device
)) {
2448 err
= wait_for_and_update_peer_seq(peer_device
, peer_seq
);
2449 drbd_send_ack_dp(peer_device
, P_NEG_ACK
, p
, pi
->size
);
2450 atomic_inc(&connection
->current_epoch
->epoch_size
);
2451 err2
= drbd_drain_block(peer_device
, pi
->size
);
2458 * Corresponding put_ldev done either below (on various errors), or in
2459 * drbd_peer_request_endio, if we successfully submit the data at the
2460 * end of this function.
2463 sector
= be64_to_cpu(p
->sector
);
2464 peer_req
= read_in_block(peer_device
, p
->block_id
, sector
, pi
);
2470 peer_req
->w
.cb
= e_end_block
;
2471 peer_req
->submit_jif
= jiffies
;
2472 peer_req
->flags
|= EE_APPLICATION
;
2474 dp_flags
= be32_to_cpu(p
->dp_flags
);
2475 op
= wire_flags_to_bio_op(dp_flags
);
2476 op_flags
= wire_flags_to_bio_flags(dp_flags
);
2477 if (pi
->cmd
== P_TRIM
) {
2478 D_ASSERT(peer_device
, peer_req
->i
.size
> 0);
2479 D_ASSERT(peer_device
, op
== REQ_OP_WRITE_ZEROES
);
2480 D_ASSERT(peer_device
, peer_req
->pages
== NULL
);
2481 } else if (peer_req
->pages
== NULL
) {
2482 D_ASSERT(device
, peer_req
->i
.size
== 0);
2483 D_ASSERT(device
, dp_flags
& DP_FLUSH
);
2486 if (dp_flags
& DP_MAY_SET_IN_SYNC
)
2487 peer_req
->flags
|= EE_MAY_SET_IN_SYNC
;
2489 spin_lock(&connection
->epoch_lock
);
2490 peer_req
->epoch
= connection
->current_epoch
;
2491 atomic_inc(&peer_req
->epoch
->epoch_size
);
2492 atomic_inc(&peer_req
->epoch
->active
);
2493 spin_unlock(&connection
->epoch_lock
);
2496 nc
= rcu_dereference(peer_device
->connection
->net_conf
);
2497 tp
= nc
->two_primaries
;
2498 if (peer_device
->connection
->agreed_pro_version
< 100) {
2499 switch (nc
->wire_protocol
) {
2501 dp_flags
|= DP_SEND_WRITE_ACK
;
2504 dp_flags
|= DP_SEND_RECEIVE_ACK
;
2510 if (dp_flags
& DP_SEND_WRITE_ACK
) {
2511 peer_req
->flags
|= EE_SEND_WRITE_ACK
;
2512 inc_unacked(device
);
2513 /* corresponding dec_unacked() in e_end_block()
2514 * respective _drbd_clear_done_ee */
2517 if (dp_flags
& DP_SEND_RECEIVE_ACK
) {
2518 /* I really don't like it that the receiver thread
2519 * sends on the msock, but anyways */
2520 drbd_send_ack(peer_device
, P_RECV_ACK
, peer_req
);
2524 /* two primaries implies protocol C */
2525 D_ASSERT(device
, dp_flags
& DP_SEND_WRITE_ACK
);
2526 peer_req
->flags
|= EE_IN_INTERVAL_TREE
;
2527 err
= wait_for_and_update_peer_seq(peer_device
, peer_seq
);
2529 goto out_interrupted
;
2530 spin_lock_irq(&device
->resource
->req_lock
);
2531 err
= handle_write_conflicts(device
, peer_req
);
2533 spin_unlock_irq(&device
->resource
->req_lock
);
2534 if (err
== -ENOENT
) {
2538 goto out_interrupted
;
2541 update_peer_seq(peer_device
, peer_seq
);
2542 spin_lock_irq(&device
->resource
->req_lock
);
2544 /* TRIM and WRITE_SAME are processed synchronously,
2545 * we wait for all pending requests, respectively wait for
2546 * active_ee to become empty in drbd_submit_peer_request();
2547 * better not add ourselves here. */
2548 if ((peer_req
->flags
& (EE_IS_TRIM
|EE_WRITE_SAME
)) == 0)
2549 list_add_tail(&peer_req
->w
.list
, &device
->active_ee
);
2550 spin_unlock_irq(&device
->resource
->req_lock
);
2552 if (device
->state
.conn
== C_SYNC_TARGET
)
2553 wait_event(device
->ee_wait
, !overlapping_resync_write(device
, peer_req
));
2555 if (device
->state
.pdsk
< D_INCONSISTENT
) {
2556 /* In case we have the only disk of the cluster, */
2557 drbd_set_out_of_sync(device
, peer_req
->i
.sector
, peer_req
->i
.size
);
2558 peer_req
->flags
&= ~EE_MAY_SET_IN_SYNC
;
2559 drbd_al_begin_io(device
, &peer_req
->i
);
2560 peer_req
->flags
|= EE_CALL_AL_COMPLETE_IO
;
2563 err
= drbd_submit_peer_request(device
, peer_req
, op
, op_flags
,
2568 /* don't care for the reason here */
2569 drbd_err(device
, "submit failed, triggering re-connect\n");
2570 spin_lock_irq(&device
->resource
->req_lock
);
2571 list_del(&peer_req
->w
.list
);
2572 drbd_remove_epoch_entry_interval(device
, peer_req
);
2573 spin_unlock_irq(&device
->resource
->req_lock
);
2574 if (peer_req
->flags
& EE_CALL_AL_COMPLETE_IO
) {
2575 peer_req
->flags
&= ~EE_CALL_AL_COMPLETE_IO
;
2576 drbd_al_complete_io(device
, &peer_req
->i
);
2580 drbd_may_finish_epoch(connection
, peer_req
->epoch
, EV_PUT
| EV_CLEANUP
);
2582 drbd_free_peer_req(device
, peer_req
);
2586 /* We may throttle resync, if the lower device seems to be busy,
2587 * and current sync rate is above c_min_rate.
2589 * To decide whether or not the lower device is busy, we use a scheme similar
2590 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2591 * (more than 64 sectors) of activity we cannot account for with our own resync
2592 * activity, it obviously is "busy".
2594 * The current sync rate used here uses only the most recent two step marks,
2595 * to have a short time average so we can react faster.
2597 bool drbd_rs_should_slow_down(struct drbd_device
*device
, sector_t sector
,
2598 bool throttle_if_app_is_waiting
)
2600 struct lc_element
*tmp
;
2601 bool throttle
= drbd_rs_c_min_rate_throttle(device
);
2603 if (!throttle
|| throttle_if_app_is_waiting
)
2606 spin_lock_irq(&device
->al_lock
);
2607 tmp
= lc_find(device
->resync
, BM_SECT_TO_EXT(sector
));
2609 struct bm_extent
*bm_ext
= lc_entry(tmp
, struct bm_extent
, lce
);
2610 if (test_bit(BME_PRIORITY
, &bm_ext
->flags
))
2612 /* Do not slow down if app IO is already waiting for this extent,
2613 * and our progress is necessary for application IO to complete. */
2615 spin_unlock_irq(&device
->al_lock
);
2620 bool drbd_rs_c_min_rate_throttle(struct drbd_device
*device
)
2622 struct gendisk
*disk
= device
->ldev
->backing_bdev
->bd_contains
->bd_disk
;
2623 unsigned long db
, dt
, dbdt
;
2624 unsigned int c_min_rate
;
2628 c_min_rate
= rcu_dereference(device
->ldev
->disk_conf
)->c_min_rate
;
2631 /* feature disabled? */
2632 if (c_min_rate
== 0)
2635 curr_events
= (int)part_stat_read(&disk
->part0
, sectors
[0]) +
2636 (int)part_stat_read(&disk
->part0
, sectors
[1]) -
2637 atomic_read(&device
->rs_sect_ev
);
2639 if (atomic_read(&device
->ap_actlog_cnt
)
2640 || curr_events
- device
->rs_last_events
> 64) {
2641 unsigned long rs_left
;
2644 device
->rs_last_events
= curr_events
;
2646 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2648 i
= (device
->rs_last_mark
+ DRBD_SYNC_MARKS
-1) % DRBD_SYNC_MARKS
;
2650 if (device
->state
.conn
== C_VERIFY_S
|| device
->state
.conn
== C_VERIFY_T
)
2651 rs_left
= device
->ov_left
;
2653 rs_left
= drbd_bm_total_weight(device
) - device
->rs_failed
;
2655 dt
= ((long)jiffies
- (long)device
->rs_mark_time
[i
]) / HZ
;
2658 db
= device
->rs_mark_left
[i
] - rs_left
;
2659 dbdt
= Bit2KB(db
/dt
);
2661 if (dbdt
> c_min_rate
)
2667 static int receive_DataRequest(struct drbd_connection
*connection
, struct packet_info
*pi
)
2669 struct drbd_peer_device
*peer_device
;
2670 struct drbd_device
*device
;
2673 struct drbd_peer_request
*peer_req
;
2674 struct digest_info
*di
= NULL
;
2676 unsigned int fault_type
;
2677 struct p_block_req
*p
= pi
->data
;
2679 peer_device
= conn_peer_device(connection
, pi
->vnr
);
2682 device
= peer_device
->device
;
2683 capacity
= drbd_get_capacity(device
->this_bdev
);
2685 sector
= be64_to_cpu(p
->sector
);
2686 size
= be32_to_cpu(p
->blksize
);
2688 if (size
<= 0 || !IS_ALIGNED(size
, 512) || size
> DRBD_MAX_BIO_SIZE
) {
2689 drbd_err(device
, "%s:%d: sector: %llus, size: %u\n", __FILE__
, __LINE__
,
2690 (unsigned long long)sector
, size
);
2693 if (sector
+ (size
>>9) > capacity
) {
2694 drbd_err(device
, "%s:%d: sector: %llus, size: %u\n", __FILE__
, __LINE__
,
2695 (unsigned long long)sector
, size
);
2699 if (!get_ldev_if_state(device
, D_UP_TO_DATE
)) {
2702 case P_DATA_REQUEST
:
2703 drbd_send_ack_rp(peer_device
, P_NEG_DREPLY
, p
);
2706 case P_RS_DATA_REQUEST
:
2707 case P_CSUM_RS_REQUEST
:
2709 drbd_send_ack_rp(peer_device
, P_NEG_RS_DREPLY
, p
);
2713 dec_rs_pending(device
);
2714 drbd_send_ack_ex(peer_device
, P_OV_RESULT
, sector
, size
, ID_IN_SYNC
);
2719 if (verb
&& __ratelimit(&drbd_ratelimit_state
))
2720 drbd_err(device
, "Can not satisfy peer's read request, "
2721 "no local data.\n");
2723 /* drain possibly payload */
2724 return drbd_drain_block(peer_device
, pi
->size
);
2727 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2728 * "criss-cross" setup, that might cause write-out on some other DRBD,
2729 * which in turn might block on the other node at this very place. */
2730 peer_req
= drbd_alloc_peer_req(peer_device
, p
->block_id
, sector
, size
,
2738 case P_DATA_REQUEST
:
2739 peer_req
->w
.cb
= w_e_end_data_req
;
2740 fault_type
= DRBD_FAULT_DT_RD
;
2741 /* application IO, don't drbd_rs_begin_io */
2742 peer_req
->flags
|= EE_APPLICATION
;
2746 /* If at some point in the future we have a smart way to
2747 find out if this data block is completely deallocated,
2748 then we would do something smarter here than reading
2750 peer_req
->flags
|= EE_RS_THIN_REQ
;
2751 case P_RS_DATA_REQUEST
:
2752 peer_req
->w
.cb
= w_e_end_rsdata_req
;
2753 fault_type
= DRBD_FAULT_RS_RD
;
2754 /* used in the sector offset progress display */
2755 device
->bm_resync_fo
= BM_SECT_TO_BIT(sector
);
2759 case P_CSUM_RS_REQUEST
:
2760 fault_type
= DRBD_FAULT_RS_RD
;
2761 di
= kmalloc(sizeof(*di
) + pi
->size
, GFP_NOIO
);
2765 di
->digest_size
= pi
->size
;
2766 di
->digest
= (((char *)di
)+sizeof(struct digest_info
));
2768 peer_req
->digest
= di
;
2769 peer_req
->flags
|= EE_HAS_DIGEST
;
2771 if (drbd_recv_all(peer_device
->connection
, di
->digest
, pi
->size
))
2774 if (pi
->cmd
== P_CSUM_RS_REQUEST
) {
2775 D_ASSERT(device
, peer_device
->connection
->agreed_pro_version
>= 89);
2776 peer_req
->w
.cb
= w_e_end_csum_rs_req
;
2777 /* used in the sector offset progress display */
2778 device
->bm_resync_fo
= BM_SECT_TO_BIT(sector
);
2779 /* remember to report stats in drbd_resync_finished */
2780 device
->use_csums
= true;
2781 } else if (pi
->cmd
== P_OV_REPLY
) {
2782 /* track progress, we may need to throttle */
2783 atomic_add(size
>> 9, &device
->rs_sect_in
);
2784 peer_req
->w
.cb
= w_e_end_ov_reply
;
2785 dec_rs_pending(device
);
2786 /* drbd_rs_begin_io done when we sent this request,
2787 * but accounting still needs to be done. */
2788 goto submit_for_resync
;
2793 if (device
->ov_start_sector
== ~(sector_t
)0 &&
2794 peer_device
->connection
->agreed_pro_version
>= 90) {
2795 unsigned long now
= jiffies
;
2797 device
->ov_start_sector
= sector
;
2798 device
->ov_position
= sector
;
2799 device
->ov_left
= drbd_bm_bits(device
) - BM_SECT_TO_BIT(sector
);
2800 device
->rs_total
= device
->ov_left
;
2801 for (i
= 0; i
< DRBD_SYNC_MARKS
; i
++) {
2802 device
->rs_mark_left
[i
] = device
->ov_left
;
2803 device
->rs_mark_time
[i
] = now
;
2805 drbd_info(device
, "Online Verify start sector: %llu\n",
2806 (unsigned long long)sector
);
2808 peer_req
->w
.cb
= w_e_end_ov_req
;
2809 fault_type
= DRBD_FAULT_RS_RD
;
2816 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2817 * wrt the receiver, but it is not as straightforward as it may seem.
2818 * Various places in the resync start and stop logic assume resync
2819 * requests are processed in order, requeuing this on the worker thread
2820 * introduces a bunch of new code for synchronization between threads.
2822 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2823 * "forever", throttling after drbd_rs_begin_io will lock that extent
2824 * for application writes for the same time. For now, just throttle
2825 * here, where the rest of the code expects the receiver to sleep for
2829 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2830 * this defers syncer requests for some time, before letting at least
2831 * on request through. The resync controller on the receiving side
2832 * will adapt to the incoming rate accordingly.
2834 * We cannot throttle here if remote is Primary/SyncTarget:
2835 * we would also throttle its application reads.
2836 * In that case, throttling is done on the SyncTarget only.
2839 /* Even though this may be a resync request, we do add to "read_ee";
2840 * "sync_ee" is only used for resync WRITEs.
2841 * Add to list early, so debugfs can find this request
2842 * even if we have to sleep below. */
2843 spin_lock_irq(&device
->resource
->req_lock
);
2844 list_add_tail(&peer_req
->w
.list
, &device
->read_ee
);
2845 spin_unlock_irq(&device
->resource
->req_lock
);
2847 update_receiver_timing_details(connection
, drbd_rs_should_slow_down
);
2848 if (device
->state
.peer
!= R_PRIMARY
2849 && drbd_rs_should_slow_down(device
, sector
, false))
2850 schedule_timeout_uninterruptible(HZ
/10);
2851 update_receiver_timing_details(connection
, drbd_rs_begin_io
);
2852 if (drbd_rs_begin_io(device
, sector
))
2856 atomic_add(size
>> 9, &device
->rs_sect_ev
);
2859 update_receiver_timing_details(connection
, drbd_submit_peer_request
);
2860 inc_unacked(device
);
2861 if (drbd_submit_peer_request(device
, peer_req
, REQ_OP_READ
, 0,
2865 /* don't care for the reason here */
2866 drbd_err(device
, "submit failed, triggering re-connect\n");
2869 spin_lock_irq(&device
->resource
->req_lock
);
2870 list_del(&peer_req
->w
.list
);
2871 spin_unlock_irq(&device
->resource
->req_lock
);
2872 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2875 drbd_free_peer_req(device
, peer_req
);
2880 * drbd_asb_recover_0p - Recover after split-brain with no remaining primaries
2882 static int drbd_asb_recover_0p(struct drbd_peer_device
*peer_device
) __must_hold(local
)
2884 struct drbd_device
*device
= peer_device
->device
;
2885 int self
, peer
, rv
= -100;
2886 unsigned long ch_self
, ch_peer
;
2887 enum drbd_after_sb_p after_sb_0p
;
2889 self
= device
->ldev
->md
.uuid
[UI_BITMAP
] & 1;
2890 peer
= device
->p_uuid
[UI_BITMAP
] & 1;
2892 ch_peer
= device
->p_uuid
[UI_SIZE
];
2893 ch_self
= device
->comm_bm_set
;
2896 after_sb_0p
= rcu_dereference(peer_device
->connection
->net_conf
)->after_sb_0p
;
2898 switch (after_sb_0p
) {
2900 case ASB_DISCARD_SECONDARY
:
2901 case ASB_CALL_HELPER
:
2903 drbd_err(device
, "Configuration error.\n");
2905 case ASB_DISCONNECT
:
2907 case ASB_DISCARD_YOUNGER_PRI
:
2908 if (self
== 0 && peer
== 1) {
2912 if (self
== 1 && peer
== 0) {
2916 /* Else fall through to one of the other strategies... */
2917 case ASB_DISCARD_OLDER_PRI
:
2918 if (self
== 0 && peer
== 1) {
2922 if (self
== 1 && peer
== 0) {
2926 /* Else fall through to one of the other strategies... */
2927 drbd_warn(device
, "Discard younger/older primary did not find a decision\n"
2928 "Using discard-least-changes instead\n");
2929 case ASB_DISCARD_ZERO_CHG
:
2930 if (ch_peer
== 0 && ch_self
== 0) {
2931 rv
= test_bit(RESOLVE_CONFLICTS
, &peer_device
->connection
->flags
)
2935 if (ch_peer
== 0) { rv
= 1; break; }
2936 if (ch_self
== 0) { rv
= -1; break; }
2938 if (after_sb_0p
== ASB_DISCARD_ZERO_CHG
)
2940 case ASB_DISCARD_LEAST_CHG
:
2941 if (ch_self
< ch_peer
)
2943 else if (ch_self
> ch_peer
)
2945 else /* ( ch_self == ch_peer ) */
2946 /* Well, then use something else. */
2947 rv
= test_bit(RESOLVE_CONFLICTS
, &peer_device
->connection
->flags
)
2950 case ASB_DISCARD_LOCAL
:
2953 case ASB_DISCARD_REMOTE
:
2961 * drbd_asb_recover_1p - Recover after split-brain with one remaining primary
2963 static int drbd_asb_recover_1p(struct drbd_peer_device
*peer_device
) __must_hold(local
)
2965 struct drbd_device
*device
= peer_device
->device
;
2967 enum drbd_after_sb_p after_sb_1p
;
2970 after_sb_1p
= rcu_dereference(peer_device
->connection
->net_conf
)->after_sb_1p
;
2972 switch (after_sb_1p
) {
2973 case ASB_DISCARD_YOUNGER_PRI
:
2974 case ASB_DISCARD_OLDER_PRI
:
2975 case ASB_DISCARD_LEAST_CHG
:
2976 case ASB_DISCARD_LOCAL
:
2977 case ASB_DISCARD_REMOTE
:
2978 case ASB_DISCARD_ZERO_CHG
:
2979 drbd_err(device
, "Configuration error.\n");
2981 case ASB_DISCONNECT
:
2984 hg
= drbd_asb_recover_0p(peer_device
);
2985 if (hg
== -1 && device
->state
.role
== R_SECONDARY
)
2987 if (hg
== 1 && device
->state
.role
== R_PRIMARY
)
2991 rv
= drbd_asb_recover_0p(peer_device
);
2993 case ASB_DISCARD_SECONDARY
:
2994 return device
->state
.role
== R_PRIMARY
? 1 : -1;
2995 case ASB_CALL_HELPER
:
2996 hg
= drbd_asb_recover_0p(peer_device
);
2997 if (hg
== -1 && device
->state
.role
== R_PRIMARY
) {
2998 enum drbd_state_rv rv2
;
3000 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3001 * we might be here in C_WF_REPORT_PARAMS which is transient.
3002 * we do not need to wait for the after state change work either. */
3003 rv2
= drbd_change_state(device
, CS_VERBOSE
, NS(role
, R_SECONDARY
));
3004 if (rv2
!= SS_SUCCESS
) {
3005 drbd_khelper(device
, "pri-lost-after-sb");
3007 drbd_warn(device
, "Successfully gave up primary role.\n");
3018 * drbd_asb_recover_2p - Recover after split-brain with two remaining primaries
3020 static int drbd_asb_recover_2p(struct drbd_peer_device
*peer_device
) __must_hold(local
)
3022 struct drbd_device
*device
= peer_device
->device
;
3024 enum drbd_after_sb_p after_sb_2p
;
3027 after_sb_2p
= rcu_dereference(peer_device
->connection
->net_conf
)->after_sb_2p
;
3029 switch (after_sb_2p
) {
3030 case ASB_DISCARD_YOUNGER_PRI
:
3031 case ASB_DISCARD_OLDER_PRI
:
3032 case ASB_DISCARD_LEAST_CHG
:
3033 case ASB_DISCARD_LOCAL
:
3034 case ASB_DISCARD_REMOTE
:
3036 case ASB_DISCARD_SECONDARY
:
3037 case ASB_DISCARD_ZERO_CHG
:
3038 drbd_err(device
, "Configuration error.\n");
3041 rv
= drbd_asb_recover_0p(peer_device
);
3043 case ASB_DISCONNECT
:
3045 case ASB_CALL_HELPER
:
3046 hg
= drbd_asb_recover_0p(peer_device
);
3048 enum drbd_state_rv rv2
;
3050 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3051 * we might be here in C_WF_REPORT_PARAMS which is transient.
3052 * we do not need to wait for the after state change work either. */
3053 rv2
= drbd_change_state(device
, CS_VERBOSE
, NS(role
, R_SECONDARY
));
3054 if (rv2
!= SS_SUCCESS
) {
3055 drbd_khelper(device
, "pri-lost-after-sb");
3057 drbd_warn(device
, "Successfully gave up primary role.\n");
3067 static void drbd_uuid_dump(struct drbd_device
*device
, char *text
, u64
*uuid
,
3068 u64 bits
, u64 flags
)
3071 drbd_info(device
, "%s uuid info vanished while I was looking!\n", text
);
3074 drbd_info(device
, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3076 (unsigned long long)uuid
[UI_CURRENT
],
3077 (unsigned long long)uuid
[UI_BITMAP
],
3078 (unsigned long long)uuid
[UI_HISTORY_START
],
3079 (unsigned long long)uuid
[UI_HISTORY_END
],
3080 (unsigned long long)bits
,
3081 (unsigned long long)flags
);
3085 100 after split brain try auto recover
3086 2 C_SYNC_SOURCE set BitMap
3087 1 C_SYNC_SOURCE use BitMap
3089 -1 C_SYNC_TARGET use BitMap
3090 -2 C_SYNC_TARGET set BitMap
3091 -100 after split brain, disconnect
3092 -1000 unrelated data
3093 -1091 requires proto 91
3094 -1096 requires proto 96
3097 static int drbd_uuid_compare(struct drbd_device
*const device
, enum drbd_role
const peer_role
, int *rule_nr
) __must_hold(local
)
3099 struct drbd_peer_device
*const peer_device
= first_peer_device(device
);
3100 struct drbd_connection
*const connection
= peer_device
? peer_device
->connection
: NULL
;
3104 self
= device
->ldev
->md
.uuid
[UI_CURRENT
] & ~((u64
)1);
3105 peer
= device
->p_uuid
[UI_CURRENT
] & ~((u64
)1);
3108 if (self
== UUID_JUST_CREATED
&& peer
== UUID_JUST_CREATED
)
3112 if ((self
== UUID_JUST_CREATED
|| self
== (u64
)0) &&
3113 peer
!= UUID_JUST_CREATED
)
3117 if (self
!= UUID_JUST_CREATED
&&
3118 (peer
== UUID_JUST_CREATED
|| peer
== (u64
)0))
3122 int rct
, dc
; /* roles at crash time */
3124 if (device
->p_uuid
[UI_BITMAP
] == (u64
)0 && device
->ldev
->md
.uuid
[UI_BITMAP
] != (u64
)0) {
3126 if (connection
->agreed_pro_version
< 91)
3129 if ((device
->ldev
->md
.uuid
[UI_BITMAP
] & ~((u64
)1)) == (device
->p_uuid
[UI_HISTORY_START
] & ~((u64
)1)) &&
3130 (device
->ldev
->md
.uuid
[UI_HISTORY_START
] & ~((u64
)1)) == (device
->p_uuid
[UI_HISTORY_START
+ 1] & ~((u64
)1))) {
3131 drbd_info(device
, "was SyncSource, missed the resync finished event, corrected myself:\n");
3132 drbd_uuid_move_history(device
);
3133 device
->ldev
->md
.uuid
[UI_HISTORY_START
] = device
->ldev
->md
.uuid
[UI_BITMAP
];
3134 device
->ldev
->md
.uuid
[UI_BITMAP
] = 0;
3136 drbd_uuid_dump(device
, "self", device
->ldev
->md
.uuid
,
3137 device
->state
.disk
>= D_NEGOTIATING
? drbd_bm_total_weight(device
) : 0, 0);
3140 drbd_info(device
, "was SyncSource (peer failed to write sync_uuid)\n");
3147 if (device
->ldev
->md
.uuid
[UI_BITMAP
] == (u64
)0 && device
->p_uuid
[UI_BITMAP
] != (u64
)0) {
3149 if (connection
->agreed_pro_version
< 91)
3152 if ((device
->ldev
->md
.uuid
[UI_HISTORY_START
] & ~((u64
)1)) == (device
->p_uuid
[UI_BITMAP
] & ~((u64
)1)) &&
3153 (device
->ldev
->md
.uuid
[UI_HISTORY_START
+ 1] & ~((u64
)1)) == (device
->p_uuid
[UI_HISTORY_START
] & ~((u64
)1))) {
3154 drbd_info(device
, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3156 device
->p_uuid
[UI_HISTORY_START
+ 1] = device
->p_uuid
[UI_HISTORY_START
];
3157 device
->p_uuid
[UI_HISTORY_START
] = device
->p_uuid
[UI_BITMAP
];
3158 device
->p_uuid
[UI_BITMAP
] = 0UL;
3160 drbd_uuid_dump(device
, "peer", device
->p_uuid
, device
->p_uuid
[UI_SIZE
], device
->p_uuid
[UI_FLAGS
]);
3163 drbd_info(device
, "was SyncTarget (failed to write sync_uuid)\n");
3170 /* Common power [off|failure] */
3171 rct
= (test_bit(CRASHED_PRIMARY
, &device
->flags
) ? 1 : 0) +
3172 (device
->p_uuid
[UI_FLAGS
] & 2);
3173 /* lowest bit is set when we were primary,
3174 * next bit (weight 2) is set when peer was primary */
3177 /* Neither has the "crashed primary" flag set,
3178 * only a replication link hickup. */
3182 /* Current UUID equal and no bitmap uuid; does not necessarily
3183 * mean this was a "simultaneous hard crash", maybe IO was
3184 * frozen, so no UUID-bump happened.
3185 * This is a protocol change, overload DRBD_FF_WSAME as flag
3186 * for "new-enough" peer DRBD version. */
3187 if (device
->state
.role
== R_PRIMARY
|| peer_role
== R_PRIMARY
) {
3189 if (!(connection
->agreed_features
& DRBD_FF_WSAME
)) {
3190 drbd_warn(peer_device
, "Equivalent unrotated UUIDs, but current primary present.\n");
3191 return -(0x10000 | PRO_VERSION_MAX
| (DRBD_FF_WSAME
<< 8));
3193 if (device
->state
.role
== R_PRIMARY
&& peer_role
== R_PRIMARY
) {
3194 /* At least one has the "crashed primary" bit set,
3195 * both are primary now, but neither has rotated its UUIDs?
3196 * "Can not happen." */
3197 drbd_err(peer_device
, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3200 if (device
->state
.role
== R_PRIMARY
)
3205 /* Both are secondary.
3206 * Really looks like recovery from simultaneous hard crash.
3207 * Check which had been primary before, and arbitrate. */
3209 case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3210 case 1: /* self_pri && !peer_pri */ return 1;
3211 case 2: /* !self_pri && peer_pri */ return -1;
3212 case 3: /* self_pri && peer_pri */
3213 dc
= test_bit(RESOLVE_CONFLICTS
, &connection
->flags
);
3219 peer
= device
->p_uuid
[UI_BITMAP
] & ~((u64
)1);
3224 peer
= device
->p_uuid
[UI_HISTORY_START
] & ~((u64
)1);
3226 if (connection
->agreed_pro_version
< 96 ?
3227 (device
->ldev
->md
.uuid
[UI_HISTORY_START
] & ~((u64
)1)) ==
3228 (device
->p_uuid
[UI_HISTORY_START
+ 1] & ~((u64
)1)) :
3229 peer
+ UUID_NEW_BM_OFFSET
== (device
->p_uuid
[UI_BITMAP
] & ~((u64
)1))) {
3230 /* The last P_SYNC_UUID did not get though. Undo the last start of
3231 resync as sync source modifications of the peer's UUIDs. */
3233 if (connection
->agreed_pro_version
< 91)
3236 device
->p_uuid
[UI_BITMAP
] = device
->p_uuid
[UI_HISTORY_START
];
3237 device
->p_uuid
[UI_HISTORY_START
] = device
->p_uuid
[UI_HISTORY_START
+ 1];
3239 drbd_info(device
, "Lost last syncUUID packet, corrected:\n");
3240 drbd_uuid_dump(device
, "peer", device
->p_uuid
, device
->p_uuid
[UI_SIZE
], device
->p_uuid
[UI_FLAGS
]);
3247 self
= device
->ldev
->md
.uuid
[UI_CURRENT
] & ~((u64
)1);
3248 for (i
= UI_HISTORY_START
; i
<= UI_HISTORY_END
; i
++) {
3249 peer
= device
->p_uuid
[i
] & ~((u64
)1);
3255 self
= device
->ldev
->md
.uuid
[UI_BITMAP
] & ~((u64
)1);
3256 peer
= device
->p_uuid
[UI_CURRENT
] & ~((u64
)1);
3261 self
= device
->ldev
->md
.uuid
[UI_HISTORY_START
] & ~((u64
)1);
3263 if (connection
->agreed_pro_version
< 96 ?
3264 (device
->ldev
->md
.uuid
[UI_HISTORY_START
+ 1] & ~((u64
)1)) ==
3265 (device
->p_uuid
[UI_HISTORY_START
] & ~((u64
)1)) :
3266 self
+ UUID_NEW_BM_OFFSET
== (device
->ldev
->md
.uuid
[UI_BITMAP
] & ~((u64
)1))) {
3267 /* The last P_SYNC_UUID did not get though. Undo the last start of
3268 resync as sync source modifications of our UUIDs. */
3270 if (connection
->agreed_pro_version
< 91)
3273 __drbd_uuid_set(device
, UI_BITMAP
, device
->ldev
->md
.uuid
[UI_HISTORY_START
]);
3274 __drbd_uuid_set(device
, UI_HISTORY_START
, device
->ldev
->md
.uuid
[UI_HISTORY_START
+ 1]);
3276 drbd_info(device
, "Last syncUUID did not get through, corrected:\n");
3277 drbd_uuid_dump(device
, "self", device
->ldev
->md
.uuid
,
3278 device
->state
.disk
>= D_NEGOTIATING
? drbd_bm_total_weight(device
) : 0, 0);
3286 peer
= device
->p_uuid
[UI_CURRENT
] & ~((u64
)1);
3287 for (i
= UI_HISTORY_START
; i
<= UI_HISTORY_END
; i
++) {
3288 self
= device
->ldev
->md
.uuid
[i
] & ~((u64
)1);
3294 self
= device
->ldev
->md
.uuid
[UI_BITMAP
] & ~((u64
)1);
3295 peer
= device
->p_uuid
[UI_BITMAP
] & ~((u64
)1);
3296 if (self
== peer
&& self
!= ((u64
)0))
3300 for (i
= UI_HISTORY_START
; i
<= UI_HISTORY_END
; i
++) {
3301 self
= device
->ldev
->md
.uuid
[i
] & ~((u64
)1);
3302 for (j
= UI_HISTORY_START
; j
<= UI_HISTORY_END
; j
++) {
3303 peer
= device
->p_uuid
[j
] & ~((u64
)1);
3312 /* drbd_sync_handshake() returns the new conn state on success, or
3313 CONN_MASK (-1) on failure.
3315 static enum drbd_conns
drbd_sync_handshake(struct drbd_peer_device
*peer_device
,
3316 enum drbd_role peer_role
,
3317 enum drbd_disk_state peer_disk
) __must_hold(local
)
3319 struct drbd_device
*device
= peer_device
->device
;
3320 enum drbd_conns rv
= C_MASK
;
3321 enum drbd_disk_state mydisk
;
3322 struct net_conf
*nc
;
3323 int hg
, rule_nr
, rr_conflict
, tentative
;
3325 mydisk
= device
->state
.disk
;
3326 if (mydisk
== D_NEGOTIATING
)
3327 mydisk
= device
->new_state_tmp
.disk
;
3329 drbd_info(device
, "drbd_sync_handshake:\n");
3331 spin_lock_irq(&device
->ldev
->md
.uuid_lock
);
3332 drbd_uuid_dump(device
, "self", device
->ldev
->md
.uuid
, device
->comm_bm_set
, 0);
3333 drbd_uuid_dump(device
, "peer", device
->p_uuid
,
3334 device
->p_uuid
[UI_SIZE
], device
->p_uuid
[UI_FLAGS
]);
3336 hg
= drbd_uuid_compare(device
, peer_role
, &rule_nr
);
3337 spin_unlock_irq(&device
->ldev
->md
.uuid_lock
);
3339 drbd_info(device
, "uuid_compare()=%d by rule %d\n", hg
, rule_nr
);
3342 drbd_alert(device
, "Unrelated data, aborting!\n");
3345 if (hg
< -0x10000) {
3349 fflags
= (hg
>> 8) & 0xff;
3350 drbd_alert(device
, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3355 drbd_alert(device
, "To resolve this both sides have to support at least protocol %d\n", -hg
- 1000);
3359 if ((mydisk
== D_INCONSISTENT
&& peer_disk
> D_INCONSISTENT
) ||
3360 (peer_disk
== D_INCONSISTENT
&& mydisk
> D_INCONSISTENT
)) {
3361 int f
= (hg
== -100) || abs(hg
) == 2;
3362 hg
= mydisk
> D_INCONSISTENT
? 1 : -1;
3365 drbd_info(device
, "Becoming sync %s due to disk states.\n",
3366 hg
> 0 ? "source" : "target");
3370 drbd_khelper(device
, "initial-split-brain");
3373 nc
= rcu_dereference(peer_device
->connection
->net_conf
);
3375 if (hg
== 100 || (hg
== -100 && nc
->always_asbp
)) {
3376 int pcount
= (device
->state
.role
== R_PRIMARY
)
3377 + (peer_role
== R_PRIMARY
);
3378 int forced
= (hg
== -100);
3382 hg
= drbd_asb_recover_0p(peer_device
);
3385 hg
= drbd_asb_recover_1p(peer_device
);
3388 hg
= drbd_asb_recover_2p(peer_device
);
3391 if (abs(hg
) < 100) {
3392 drbd_warn(device
, "Split-Brain detected, %d primaries, "
3393 "automatically solved. Sync from %s node\n",
3394 pcount
, (hg
< 0) ? "peer" : "this");
3396 drbd_warn(device
, "Doing a full sync, since"
3397 " UUIDs where ambiguous.\n");
3404 if (test_bit(DISCARD_MY_DATA
, &device
->flags
) && !(device
->p_uuid
[UI_FLAGS
]&1))
3406 if (!test_bit(DISCARD_MY_DATA
, &device
->flags
) && (device
->p_uuid
[UI_FLAGS
]&1))
3410 drbd_warn(device
, "Split-Brain detected, manually solved. "
3411 "Sync from %s node\n",
3412 (hg
< 0) ? "peer" : "this");
3414 rr_conflict
= nc
->rr_conflict
;
3415 tentative
= nc
->tentative
;
3419 /* FIXME this log message is not correct if we end up here
3420 * after an attempted attach on a diskless node.
3421 * We just refuse to attach -- well, we drop the "connection"
3422 * to that disk, in a way... */
3423 drbd_alert(device
, "Split-Brain detected but unresolved, dropping connection!\n");
3424 drbd_khelper(device
, "split-brain");
3428 if (hg
> 0 && mydisk
<= D_INCONSISTENT
) {
3429 drbd_err(device
, "I shall become SyncSource, but I am inconsistent!\n");
3433 if (hg
< 0 && /* by intention we do not use mydisk here. */
3434 device
->state
.role
== R_PRIMARY
&& device
->state
.disk
>= D_CONSISTENT
) {
3435 switch (rr_conflict
) {
3436 case ASB_CALL_HELPER
:
3437 drbd_khelper(device
, "pri-lost");
3439 case ASB_DISCONNECT
:
3440 drbd_err(device
, "I shall become SyncTarget, but I am primary!\n");
3443 drbd_warn(device
, "Becoming SyncTarget, violating the stable-data"
3448 if (tentative
|| test_bit(CONN_DRY_RUN
, &peer_device
->connection
->flags
)) {
3450 drbd_info(device
, "dry-run connect: No resync, would become Connected immediately.\n");
3452 drbd_info(device
, "dry-run connect: Would become %s, doing a %s resync.",
3453 drbd_conn_str(hg
> 0 ? C_SYNC_SOURCE
: C_SYNC_TARGET
),
3454 abs(hg
) >= 2 ? "full" : "bit-map based");
3459 drbd_info(device
, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3460 if (drbd_bitmap_io(device
, &drbd_bmio_set_n_write
, "set_n_write from sync_handshake",
3461 BM_LOCKED_SET_ALLOWED
))
3465 if (hg
> 0) { /* become sync source. */
3467 } else if (hg
< 0) { /* become sync target */
3471 if (drbd_bm_total_weight(device
)) {
3472 drbd_info(device
, "No resync, but %lu bits in bitmap!\n",
3473 drbd_bm_total_weight(device
));
3480 static enum drbd_after_sb_p
convert_after_sb(enum drbd_after_sb_p peer
)
3482 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3483 if (peer
== ASB_DISCARD_REMOTE
)
3484 return ASB_DISCARD_LOCAL
;
3486 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3487 if (peer
== ASB_DISCARD_LOCAL
)
3488 return ASB_DISCARD_REMOTE
;
3490 /* everything else is valid if they are equal on both sides. */
3494 static int receive_protocol(struct drbd_connection
*connection
, struct packet_info
*pi
)
3496 struct p_protocol
*p
= pi
->data
;
3497 enum drbd_after_sb_p p_after_sb_0p
, p_after_sb_1p
, p_after_sb_2p
;
3498 int p_proto
, p_discard_my_data
, p_two_primaries
, cf
;
3499 struct net_conf
*nc
, *old_net_conf
, *new_net_conf
= NULL
;
3500 char integrity_alg
[SHARED_SECRET_MAX
] = "";
3501 struct crypto_ahash
*peer_integrity_tfm
= NULL
;
3502 void *int_dig_in
= NULL
, *int_dig_vv
= NULL
;
3504 p_proto
= be32_to_cpu(p
->protocol
);
3505 p_after_sb_0p
= be32_to_cpu(p
->after_sb_0p
);
3506 p_after_sb_1p
= be32_to_cpu(p
->after_sb_1p
);
3507 p_after_sb_2p
= be32_to_cpu(p
->after_sb_2p
);
3508 p_two_primaries
= be32_to_cpu(p
->two_primaries
);
3509 cf
= be32_to_cpu(p
->conn_flags
);
3510 p_discard_my_data
= cf
& CF_DISCARD_MY_DATA
;
3512 if (connection
->agreed_pro_version
>= 87) {
3515 if (pi
->size
> sizeof(integrity_alg
))
3517 err
= drbd_recv_all(connection
, integrity_alg
, pi
->size
);
3520 integrity_alg
[SHARED_SECRET_MAX
- 1] = 0;
3523 if (pi
->cmd
!= P_PROTOCOL_UPDATE
) {
3524 clear_bit(CONN_DRY_RUN
, &connection
->flags
);
3526 if (cf
& CF_DRY_RUN
)
3527 set_bit(CONN_DRY_RUN
, &connection
->flags
);
3530 nc
= rcu_dereference(connection
->net_conf
);
3532 if (p_proto
!= nc
->wire_protocol
) {
3533 drbd_err(connection
, "incompatible %s settings\n", "protocol");
3534 goto disconnect_rcu_unlock
;
3537 if (convert_after_sb(p_after_sb_0p
) != nc
->after_sb_0p
) {
3538 drbd_err(connection
, "incompatible %s settings\n", "after-sb-0pri");
3539 goto disconnect_rcu_unlock
;
3542 if (convert_after_sb(p_after_sb_1p
) != nc
->after_sb_1p
) {
3543 drbd_err(connection
, "incompatible %s settings\n", "after-sb-1pri");
3544 goto disconnect_rcu_unlock
;
3547 if (convert_after_sb(p_after_sb_2p
) != nc
->after_sb_2p
) {
3548 drbd_err(connection
, "incompatible %s settings\n", "after-sb-2pri");
3549 goto disconnect_rcu_unlock
;
3552 if (p_discard_my_data
&& nc
->discard_my_data
) {
3553 drbd_err(connection
, "incompatible %s settings\n", "discard-my-data");
3554 goto disconnect_rcu_unlock
;
3557 if (p_two_primaries
!= nc
->two_primaries
) {
3558 drbd_err(connection
, "incompatible %s settings\n", "allow-two-primaries");
3559 goto disconnect_rcu_unlock
;
3562 if (strcmp(integrity_alg
, nc
->integrity_alg
)) {
3563 drbd_err(connection
, "incompatible %s settings\n", "data-integrity-alg");
3564 goto disconnect_rcu_unlock
;
3570 if (integrity_alg
[0]) {
3574 * We can only change the peer data integrity algorithm
3575 * here. Changing our own data integrity algorithm
3576 * requires that we send a P_PROTOCOL_UPDATE packet at
3577 * the same time; otherwise, the peer has no way to
3578 * tell between which packets the algorithm should
3582 peer_integrity_tfm
= crypto_alloc_ahash(integrity_alg
, 0, CRYPTO_ALG_ASYNC
);
3583 if (IS_ERR(peer_integrity_tfm
)) {
3584 peer_integrity_tfm
= NULL
;
3585 drbd_err(connection
, "peer data-integrity-alg %s not supported\n",
3590 hash_size
= crypto_ahash_digestsize(peer_integrity_tfm
);
3591 int_dig_in
= kmalloc(hash_size
, GFP_KERNEL
);
3592 int_dig_vv
= kmalloc(hash_size
, GFP_KERNEL
);
3593 if (!(int_dig_in
&& int_dig_vv
)) {
3594 drbd_err(connection
, "Allocation of buffers for data integrity checking failed\n");
3599 new_net_conf
= kmalloc(sizeof(struct net_conf
), GFP_KERNEL
);
3600 if (!new_net_conf
) {
3601 drbd_err(connection
, "Allocation of new net_conf failed\n");
3605 mutex_lock(&connection
->data
.mutex
);
3606 mutex_lock(&connection
->resource
->conf_update
);
3607 old_net_conf
= connection
->net_conf
;
3608 *new_net_conf
= *old_net_conf
;
3610 new_net_conf
->wire_protocol
= p_proto
;
3611 new_net_conf
->after_sb_0p
= convert_after_sb(p_after_sb_0p
);
3612 new_net_conf
->after_sb_1p
= convert_after_sb(p_after_sb_1p
);
3613 new_net_conf
->after_sb_2p
= convert_after_sb(p_after_sb_2p
);
3614 new_net_conf
->two_primaries
= p_two_primaries
;
3616 rcu_assign_pointer(connection
->net_conf
, new_net_conf
);
3617 mutex_unlock(&connection
->resource
->conf_update
);
3618 mutex_unlock(&connection
->data
.mutex
);
3620 crypto_free_ahash(connection
->peer_integrity_tfm
);
3621 kfree(connection
->int_dig_in
);
3622 kfree(connection
->int_dig_vv
);
3623 connection
->peer_integrity_tfm
= peer_integrity_tfm
;
3624 connection
->int_dig_in
= int_dig_in
;
3625 connection
->int_dig_vv
= int_dig_vv
;
3627 if (strcmp(old_net_conf
->integrity_alg
, integrity_alg
))
3628 drbd_info(connection
, "peer data-integrity-alg: %s\n",
3629 integrity_alg
[0] ? integrity_alg
: "(none)");
3632 kfree(old_net_conf
);
3635 disconnect_rcu_unlock
:
3638 crypto_free_ahash(peer_integrity_tfm
);
3641 conn_request_state(connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
3646 * input: alg name, feature name
3647 * return: NULL (alg name was "")
3648 * ERR_PTR(error) if something goes wrong
3649 * or the crypto hash ptr, if it worked out ok. */
3650 static struct crypto_ahash
*drbd_crypto_alloc_digest_safe(const struct drbd_device
*device
,
3651 const char *alg
, const char *name
)
3653 struct crypto_ahash
*tfm
;
3658 tfm
= crypto_alloc_ahash(alg
, 0, CRYPTO_ALG_ASYNC
);
3660 drbd_err(device
, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3661 alg
, name
, PTR_ERR(tfm
));
3667 static int ignore_remaining_packet(struct drbd_connection
*connection
, struct packet_info
*pi
)
3669 void *buffer
= connection
->data
.rbuf
;
3670 int size
= pi
->size
;
3673 int s
= min_t(int, size
, DRBD_SOCKET_BUFFER_SIZE
);
3674 s
= drbd_recv(connection
, buffer
, s
);
3688 * config_unknown_volume - device configuration command for unknown volume
3690 * When a device is added to an existing connection, the node on which the
3691 * device is added first will send configuration commands to its peer but the
3692 * peer will not know about the device yet. It will warn and ignore these
3693 * commands. Once the device is added on the second node, the second node will
3694 * send the same device configuration commands, but in the other direction.
3696 * (We can also end up here if drbd is misconfigured.)
3698 static int config_unknown_volume(struct drbd_connection
*connection
, struct packet_info
*pi
)
3700 drbd_warn(connection
, "%s packet received for volume %u, which is not configured locally\n",
3701 cmdname(pi
->cmd
), pi
->vnr
);
3702 return ignore_remaining_packet(connection
, pi
);
3705 static int receive_SyncParam(struct drbd_connection
*connection
, struct packet_info
*pi
)
3707 struct drbd_peer_device
*peer_device
;
3708 struct drbd_device
*device
;
3709 struct p_rs_param_95
*p
;
3710 unsigned int header_size
, data_size
, exp_max_sz
;
3711 struct crypto_ahash
*verify_tfm
= NULL
;
3712 struct crypto_ahash
*csums_tfm
= NULL
;
3713 struct net_conf
*old_net_conf
, *new_net_conf
= NULL
;
3714 struct disk_conf
*old_disk_conf
= NULL
, *new_disk_conf
= NULL
;
3715 const int apv
= connection
->agreed_pro_version
;
3716 struct fifo_buffer
*old_plan
= NULL
, *new_plan
= NULL
;
3720 peer_device
= conn_peer_device(connection
, pi
->vnr
);
3722 return config_unknown_volume(connection
, pi
);
3723 device
= peer_device
->device
;
3725 exp_max_sz
= apv
<= 87 ? sizeof(struct p_rs_param
)
3726 : apv
== 88 ? sizeof(struct p_rs_param
)
3728 : apv
<= 94 ? sizeof(struct p_rs_param_89
)
3729 : /* apv >= 95 */ sizeof(struct p_rs_param_95
);
3731 if (pi
->size
> exp_max_sz
) {
3732 drbd_err(device
, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3733 pi
->size
, exp_max_sz
);
3738 header_size
= sizeof(struct p_rs_param
);
3739 data_size
= pi
->size
- header_size
;
3740 } else if (apv
<= 94) {
3741 header_size
= sizeof(struct p_rs_param_89
);
3742 data_size
= pi
->size
- header_size
;
3743 D_ASSERT(device
, data_size
== 0);
3745 header_size
= sizeof(struct p_rs_param_95
);
3746 data_size
= pi
->size
- header_size
;
3747 D_ASSERT(device
, data_size
== 0);
3750 /* initialize verify_alg and csums_alg */
3752 memset(p
->verify_alg
, 0, 2 * SHARED_SECRET_MAX
);
3754 err
= drbd_recv_all(peer_device
->connection
, p
, header_size
);
3758 mutex_lock(&connection
->resource
->conf_update
);
3759 old_net_conf
= peer_device
->connection
->net_conf
;
3760 if (get_ldev(device
)) {
3761 new_disk_conf
= kzalloc(sizeof(struct disk_conf
), GFP_KERNEL
);
3762 if (!new_disk_conf
) {
3764 mutex_unlock(&connection
->resource
->conf_update
);
3765 drbd_err(device
, "Allocation of new disk_conf failed\n");
3769 old_disk_conf
= device
->ldev
->disk_conf
;
3770 *new_disk_conf
= *old_disk_conf
;
3772 new_disk_conf
->resync_rate
= be32_to_cpu(p
->resync_rate
);
3777 if (data_size
> SHARED_SECRET_MAX
|| data_size
== 0) {
3778 drbd_err(device
, "verify-alg of wrong size, "
3779 "peer wants %u, accepting only up to %u byte\n",
3780 data_size
, SHARED_SECRET_MAX
);
3785 err
= drbd_recv_all(peer_device
->connection
, p
->verify_alg
, data_size
);
3788 /* we expect NUL terminated string */
3789 /* but just in case someone tries to be evil */
3790 D_ASSERT(device
, p
->verify_alg
[data_size
-1] == 0);
3791 p
->verify_alg
[data_size
-1] = 0;
3793 } else /* apv >= 89 */ {
3794 /* we still expect NUL terminated strings */
3795 /* but just in case someone tries to be evil */
3796 D_ASSERT(device
, p
->verify_alg
[SHARED_SECRET_MAX
-1] == 0);
3797 D_ASSERT(device
, p
->csums_alg
[SHARED_SECRET_MAX
-1] == 0);
3798 p
->verify_alg
[SHARED_SECRET_MAX
-1] = 0;
3799 p
->csums_alg
[SHARED_SECRET_MAX
-1] = 0;
3802 if (strcmp(old_net_conf
->verify_alg
, p
->verify_alg
)) {
3803 if (device
->state
.conn
== C_WF_REPORT_PARAMS
) {
3804 drbd_err(device
, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3805 old_net_conf
->verify_alg
, p
->verify_alg
);
3808 verify_tfm
= drbd_crypto_alloc_digest_safe(device
,
3809 p
->verify_alg
, "verify-alg");
3810 if (IS_ERR(verify_tfm
)) {
3816 if (apv
>= 89 && strcmp(old_net_conf
->csums_alg
, p
->csums_alg
)) {
3817 if (device
->state
.conn
== C_WF_REPORT_PARAMS
) {
3818 drbd_err(device
, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3819 old_net_conf
->csums_alg
, p
->csums_alg
);
3822 csums_tfm
= drbd_crypto_alloc_digest_safe(device
,
3823 p
->csums_alg
, "csums-alg");
3824 if (IS_ERR(csums_tfm
)) {
3830 if (apv
> 94 && new_disk_conf
) {
3831 new_disk_conf
->c_plan_ahead
= be32_to_cpu(p
->c_plan_ahead
);
3832 new_disk_conf
->c_delay_target
= be32_to_cpu(p
->c_delay_target
);
3833 new_disk_conf
->c_fill_target
= be32_to_cpu(p
->c_fill_target
);
3834 new_disk_conf
->c_max_rate
= be32_to_cpu(p
->c_max_rate
);
3836 fifo_size
= (new_disk_conf
->c_plan_ahead
* 10 * SLEEP_TIME
) / HZ
;
3837 if (fifo_size
!= device
->rs_plan_s
->size
) {
3838 new_plan
= fifo_alloc(fifo_size
);
3840 drbd_err(device
, "kmalloc of fifo_buffer failed");
3847 if (verify_tfm
|| csums_tfm
) {
3848 new_net_conf
= kzalloc(sizeof(struct net_conf
), GFP_KERNEL
);
3849 if (!new_net_conf
) {
3850 drbd_err(device
, "Allocation of new net_conf failed\n");
3854 *new_net_conf
= *old_net_conf
;
3857 strcpy(new_net_conf
->verify_alg
, p
->verify_alg
);
3858 new_net_conf
->verify_alg_len
= strlen(p
->verify_alg
) + 1;
3859 crypto_free_ahash(peer_device
->connection
->verify_tfm
);
3860 peer_device
->connection
->verify_tfm
= verify_tfm
;
3861 drbd_info(device
, "using verify-alg: \"%s\"\n", p
->verify_alg
);
3864 strcpy(new_net_conf
->csums_alg
, p
->csums_alg
);
3865 new_net_conf
->csums_alg_len
= strlen(p
->csums_alg
) + 1;
3866 crypto_free_ahash(peer_device
->connection
->csums_tfm
);
3867 peer_device
->connection
->csums_tfm
= csums_tfm
;
3868 drbd_info(device
, "using csums-alg: \"%s\"\n", p
->csums_alg
);
3870 rcu_assign_pointer(connection
->net_conf
, new_net_conf
);
3874 if (new_disk_conf
) {
3875 rcu_assign_pointer(device
->ldev
->disk_conf
, new_disk_conf
);
3880 old_plan
= device
->rs_plan_s
;
3881 rcu_assign_pointer(device
->rs_plan_s
, new_plan
);
3884 mutex_unlock(&connection
->resource
->conf_update
);
3887 kfree(old_net_conf
);
3888 kfree(old_disk_conf
);
3894 if (new_disk_conf
) {
3896 kfree(new_disk_conf
);
3898 mutex_unlock(&connection
->resource
->conf_update
);
3903 if (new_disk_conf
) {
3905 kfree(new_disk_conf
);
3907 mutex_unlock(&connection
->resource
->conf_update
);
3908 /* just for completeness: actually not needed,
3909 * as this is not reached if csums_tfm was ok. */
3910 crypto_free_ahash(csums_tfm
);
3911 /* but free the verify_tfm again, if csums_tfm did not work out */
3912 crypto_free_ahash(verify_tfm
);
3913 conn_request_state(peer_device
->connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
3917 /* warn if the arguments differ by more than 12.5% */
3918 static void warn_if_differ_considerably(struct drbd_device
*device
,
3919 const char *s
, sector_t a
, sector_t b
)
3922 if (a
== 0 || b
== 0)
3924 d
= (a
> b
) ? (a
- b
) : (b
- a
);
3925 if (d
> (a
>>3) || d
> (b
>>3))
3926 drbd_warn(device
, "Considerable difference in %s: %llus vs. %llus\n", s
,
3927 (unsigned long long)a
, (unsigned long long)b
);
3930 static int receive_sizes(struct drbd_connection
*connection
, struct packet_info
*pi
)
3932 struct drbd_peer_device
*peer_device
;
3933 struct drbd_device
*device
;
3934 struct p_sizes
*p
= pi
->data
;
3935 struct o_qlim
*o
= (connection
->agreed_features
& DRBD_FF_WSAME
) ? p
->qlim
: NULL
;
3936 enum determine_dev_size dd
= DS_UNCHANGED
;
3937 sector_t p_size
, p_usize
, p_csize
, my_usize
;
3938 int ldsc
= 0; /* local disk size changed */
3939 enum dds_flags ddsf
;
3941 peer_device
= conn_peer_device(connection
, pi
->vnr
);
3943 return config_unknown_volume(connection
, pi
);
3944 device
= peer_device
->device
;
3946 p_size
= be64_to_cpu(p
->d_size
);
3947 p_usize
= be64_to_cpu(p
->u_size
);
3948 p_csize
= be64_to_cpu(p
->c_size
);
3950 /* just store the peer's disk size for now.
3951 * we still need to figure out whether we accept that. */
3952 device
->p_size
= p_size
;
3954 if (get_ldev(device
)) {
3955 sector_t new_size
, cur_size
;
3957 my_usize
= rcu_dereference(device
->ldev
->disk_conf
)->disk_size
;
3960 warn_if_differ_considerably(device
, "lower level device sizes",
3961 p_size
, drbd_get_max_capacity(device
->ldev
));
3962 warn_if_differ_considerably(device
, "user requested size",
3965 /* if this is the first connect, or an otherwise expected
3966 * param exchange, choose the minimum */
3967 if (device
->state
.conn
== C_WF_REPORT_PARAMS
)
3968 p_usize
= min_not_zero(my_usize
, p_usize
);
3970 /* Never shrink a device with usable data during connect.
3971 But allow online shrinking if we are connected. */
3972 new_size
= drbd_new_dev_size(device
, device
->ldev
, p_usize
, 0);
3973 cur_size
= drbd_get_capacity(device
->this_bdev
);
3974 if (new_size
< cur_size
&&
3975 device
->state
.disk
>= D_OUTDATED
&&
3976 device
->state
.conn
< C_CONNECTED
) {
3977 drbd_err(device
, "The peer's disk size is too small! (%llu < %llu sectors)\n",
3978 (unsigned long long)new_size
, (unsigned long long)cur_size
);
3979 conn_request_state(peer_device
->connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
3984 if (my_usize
!= p_usize
) {
3985 struct disk_conf
*old_disk_conf
, *new_disk_conf
= NULL
;
3987 new_disk_conf
= kzalloc(sizeof(struct disk_conf
), GFP_KERNEL
);
3988 if (!new_disk_conf
) {
3989 drbd_err(device
, "Allocation of new disk_conf failed\n");
3994 mutex_lock(&connection
->resource
->conf_update
);
3995 old_disk_conf
= device
->ldev
->disk_conf
;
3996 *new_disk_conf
= *old_disk_conf
;
3997 new_disk_conf
->disk_size
= p_usize
;
3999 rcu_assign_pointer(device
->ldev
->disk_conf
, new_disk_conf
);
4000 mutex_unlock(&connection
->resource
->conf_update
);
4002 kfree(old_disk_conf
);
4004 drbd_info(device
, "Peer sets u_size to %lu sectors\n",
4005 (unsigned long)my_usize
);
4011 device
->peer_max_bio_size
= be32_to_cpu(p
->max_bio_size
);
4012 /* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4013 In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4014 drbd_reconsider_queue_parameters(), we can be sure that after
4015 drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4017 ddsf
= be16_to_cpu(p
->dds_flags
);
4018 if (get_ldev(device
)) {
4019 drbd_reconsider_queue_parameters(device
, device
->ldev
, o
);
4020 dd
= drbd_determine_dev_size(device
, ddsf
, NULL
);
4024 drbd_md_sync(device
);
4027 * I am diskless, need to accept the peer's *current* size.
4028 * I must NOT accept the peers backing disk size,
4029 * it may have been larger than mine all along...
4031 * At this point, the peer knows more about my disk, or at
4032 * least about what we last agreed upon, than myself.
4033 * So if his c_size is less than his d_size, the most likely
4034 * reason is that *my* d_size was smaller last time we checked.
4036 * However, if he sends a zero current size,
4037 * take his (user-capped or) backing disk size anyways.
4039 drbd_reconsider_queue_parameters(device
, NULL
, o
);
4040 drbd_set_my_capacity(device
, p_csize
?: p_usize
?: p_size
);
4043 if (get_ldev(device
)) {
4044 if (device
->ldev
->known_size
!= drbd_get_capacity(device
->ldev
->backing_bdev
)) {
4045 device
->ldev
->known_size
= drbd_get_capacity(device
->ldev
->backing_bdev
);
4052 if (device
->state
.conn
> C_WF_REPORT_PARAMS
) {
4053 if (be64_to_cpu(p
->c_size
) !=
4054 drbd_get_capacity(device
->this_bdev
) || ldsc
) {
4055 /* we have different sizes, probably peer
4056 * needs to know my new size... */
4057 drbd_send_sizes(peer_device
, 0, ddsf
);
4059 if (test_and_clear_bit(RESIZE_PENDING
, &device
->flags
) ||
4060 (dd
== DS_GREW
&& device
->state
.conn
== C_CONNECTED
)) {
4061 if (device
->state
.pdsk
>= D_INCONSISTENT
&&
4062 device
->state
.disk
>= D_INCONSISTENT
) {
4063 if (ddsf
& DDSF_NO_RESYNC
)
4064 drbd_info(device
, "Resync of new storage suppressed with --assume-clean\n");
4066 resync_after_online_grow(device
);
4068 set_bit(RESYNC_AFTER_NEG
, &device
->flags
);
4075 static int receive_uuids(struct drbd_connection
*connection
, struct packet_info
*pi
)
4077 struct drbd_peer_device
*peer_device
;
4078 struct drbd_device
*device
;
4079 struct p_uuids
*p
= pi
->data
;
4081 int i
, updated_uuids
= 0;
4083 peer_device
= conn_peer_device(connection
, pi
->vnr
);
4085 return config_unknown_volume(connection
, pi
);
4086 device
= peer_device
->device
;
4088 p_uuid
= kmalloc(sizeof(u64
)*UI_EXTENDED_SIZE
, GFP_NOIO
);
4090 drbd_err(device
, "kmalloc of p_uuid failed\n");
4094 for (i
= UI_CURRENT
; i
< UI_EXTENDED_SIZE
; i
++)
4095 p_uuid
[i
] = be64_to_cpu(p
->uuid
[i
]);
4097 kfree(device
->p_uuid
);
4098 device
->p_uuid
= p_uuid
;
4100 if (device
->state
.conn
< C_CONNECTED
&&
4101 device
->state
.disk
< D_INCONSISTENT
&&
4102 device
->state
.role
== R_PRIMARY
&&
4103 (device
->ed_uuid
& ~((u64
)1)) != (p_uuid
[UI_CURRENT
] & ~((u64
)1))) {
4104 drbd_err(device
, "Can only connect to data with current UUID=%016llX\n",
4105 (unsigned long long)device
->ed_uuid
);
4106 conn_request_state(peer_device
->connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
4110 if (get_ldev(device
)) {
4111 int skip_initial_sync
=
4112 device
->state
.conn
== C_CONNECTED
&&
4113 peer_device
->connection
->agreed_pro_version
>= 90 &&
4114 device
->ldev
->md
.uuid
[UI_CURRENT
] == UUID_JUST_CREATED
&&
4115 (p_uuid
[UI_FLAGS
] & 8);
4116 if (skip_initial_sync
) {
4117 drbd_info(device
, "Accepted new current UUID, preparing to skip initial sync\n");
4118 drbd_bitmap_io(device
, &drbd_bmio_clear_n_write
,
4119 "clear_n_write from receive_uuids",
4120 BM_LOCKED_TEST_ALLOWED
);
4121 _drbd_uuid_set(device
, UI_CURRENT
, p_uuid
[UI_CURRENT
]);
4122 _drbd_uuid_set(device
, UI_BITMAP
, 0);
4123 _drbd_set_state(_NS2(device
, disk
, D_UP_TO_DATE
, pdsk
, D_UP_TO_DATE
),
4125 drbd_md_sync(device
);
4129 } else if (device
->state
.disk
< D_INCONSISTENT
&&
4130 device
->state
.role
== R_PRIMARY
) {
4131 /* I am a diskless primary, the peer just created a new current UUID
4133 updated_uuids
= drbd_set_ed_uuid(device
, p_uuid
[UI_CURRENT
]);
4136 /* Before we test for the disk state, we should wait until an eventually
4137 ongoing cluster wide state change is finished. That is important if
4138 we are primary and are detaching from our disk. We need to see the
4139 new disk state... */
4140 mutex_lock(device
->state_mutex
);
4141 mutex_unlock(device
->state_mutex
);
4142 if (device
->state
.conn
>= C_CONNECTED
&& device
->state
.disk
< D_INCONSISTENT
)
4143 updated_uuids
|= drbd_set_ed_uuid(device
, p_uuid
[UI_CURRENT
]);
4146 drbd_print_uuids(device
, "receiver updated UUIDs to");
4152 * convert_state() - Converts the peer's view of the cluster state to our point of view
4153 * @ps: The state as seen by the peer.
4155 static union drbd_state
convert_state(union drbd_state ps
)
4157 union drbd_state ms
;
4159 static enum drbd_conns c_tab
[] = {
4160 [C_WF_REPORT_PARAMS
] = C_WF_REPORT_PARAMS
,
4161 [C_CONNECTED
] = C_CONNECTED
,
4163 [C_STARTING_SYNC_S
] = C_STARTING_SYNC_T
,
4164 [C_STARTING_SYNC_T
] = C_STARTING_SYNC_S
,
4165 [C_DISCONNECTING
] = C_TEAR_DOWN
, /* C_NETWORK_FAILURE, */
4166 [C_VERIFY_S
] = C_VERIFY_T
,
4172 ms
.conn
= c_tab
[ps
.conn
];
4177 ms
.peer_isp
= (ps
.aftr_isp
| ps
.user_isp
);
4182 static int receive_req_state(struct drbd_connection
*connection
, struct packet_info
*pi
)
4184 struct drbd_peer_device
*peer_device
;
4185 struct drbd_device
*device
;
4186 struct p_req_state
*p
= pi
->data
;
4187 union drbd_state mask
, val
;
4188 enum drbd_state_rv rv
;
4190 peer_device
= conn_peer_device(connection
, pi
->vnr
);
4193 device
= peer_device
->device
;
4195 mask
.i
= be32_to_cpu(p
->mask
);
4196 val
.i
= be32_to_cpu(p
->val
);
4198 if (test_bit(RESOLVE_CONFLICTS
, &peer_device
->connection
->flags
) &&
4199 mutex_is_locked(device
->state_mutex
)) {
4200 drbd_send_sr_reply(peer_device
, SS_CONCURRENT_ST_CHG
);
4204 mask
= convert_state(mask
);
4205 val
= convert_state(val
);
4207 rv
= drbd_change_state(device
, CS_VERBOSE
, mask
, val
);
4208 drbd_send_sr_reply(peer_device
, rv
);
4210 drbd_md_sync(device
);
4215 static int receive_req_conn_state(struct drbd_connection
*connection
, struct packet_info
*pi
)
4217 struct p_req_state
*p
= pi
->data
;
4218 union drbd_state mask
, val
;
4219 enum drbd_state_rv rv
;
4221 mask
.i
= be32_to_cpu(p
->mask
);
4222 val
.i
= be32_to_cpu(p
->val
);
4224 if (test_bit(RESOLVE_CONFLICTS
, &connection
->flags
) &&
4225 mutex_is_locked(&connection
->cstate_mutex
)) {
4226 conn_send_sr_reply(connection
, SS_CONCURRENT_ST_CHG
);
4230 mask
= convert_state(mask
);
4231 val
= convert_state(val
);
4233 rv
= conn_request_state(connection
, mask
, val
, CS_VERBOSE
| CS_LOCAL_ONLY
| CS_IGN_OUTD_FAIL
);
4234 conn_send_sr_reply(connection
, rv
);
4239 static int receive_state(struct drbd_connection
*connection
, struct packet_info
*pi
)
4241 struct drbd_peer_device
*peer_device
;
4242 struct drbd_device
*device
;
4243 struct p_state
*p
= pi
->data
;
4244 union drbd_state os
, ns
, peer_state
;
4245 enum drbd_disk_state real_peer_disk
;
4246 enum chg_state_flags cs_flags
;
4249 peer_device
= conn_peer_device(connection
, pi
->vnr
);
4251 return config_unknown_volume(connection
, pi
);
4252 device
= peer_device
->device
;
4254 peer_state
.i
= be32_to_cpu(p
->state
);
4256 real_peer_disk
= peer_state
.disk
;
4257 if (peer_state
.disk
== D_NEGOTIATING
) {
4258 real_peer_disk
= device
->p_uuid
[UI_FLAGS
] & 4 ? D_INCONSISTENT
: D_CONSISTENT
;
4259 drbd_info(device
, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk
));
4262 spin_lock_irq(&device
->resource
->req_lock
);
4264 os
= ns
= drbd_read_state(device
);
4265 spin_unlock_irq(&device
->resource
->req_lock
);
4267 /* If some other part of the code (ack_receiver thread, timeout)
4268 * already decided to close the connection again,
4269 * we must not "re-establish" it here. */
4270 if (os
.conn
<= C_TEAR_DOWN
)
4273 /* If this is the "end of sync" confirmation, usually the peer disk
4274 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4275 * set) resync started in PausedSyncT, or if the timing of pause-/
4276 * unpause-sync events has been "just right", the peer disk may
4277 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4279 if ((os
.pdsk
== D_INCONSISTENT
|| os
.pdsk
== D_CONSISTENT
) &&
4280 real_peer_disk
== D_UP_TO_DATE
&&
4281 os
.conn
> C_CONNECTED
&& os
.disk
== D_UP_TO_DATE
) {
4282 /* If we are (becoming) SyncSource, but peer is still in sync
4283 * preparation, ignore its uptodate-ness to avoid flapping, it
4284 * will change to inconsistent once the peer reaches active
4286 * It may have changed syncer-paused flags, however, so we
4287 * cannot ignore this completely. */
4288 if (peer_state
.conn
> C_CONNECTED
&&
4289 peer_state
.conn
< C_SYNC_SOURCE
)
4290 real_peer_disk
= D_INCONSISTENT
;
4292 /* if peer_state changes to connected at the same time,
4293 * it explicitly notifies us that it finished resync.
4294 * Maybe we should finish it up, too? */
4295 else if (os
.conn
>= C_SYNC_SOURCE
&&
4296 peer_state
.conn
== C_CONNECTED
) {
4297 if (drbd_bm_total_weight(device
) <= device
->rs_failed
)
4298 drbd_resync_finished(device
);
4303 /* explicit verify finished notification, stop sector reached. */
4304 if (os
.conn
== C_VERIFY_T
&& os
.disk
== D_UP_TO_DATE
&&
4305 peer_state
.conn
== C_CONNECTED
&& real_peer_disk
== D_UP_TO_DATE
) {
4306 ov_out_of_sync_print(device
);
4307 drbd_resync_finished(device
);
4311 /* peer says his disk is inconsistent, while we think it is uptodate,
4312 * and this happens while the peer still thinks we have a sync going on,
4313 * but we think we are already done with the sync.
4314 * We ignore this to avoid flapping pdsk.
4315 * This should not happen, if the peer is a recent version of drbd. */
4316 if (os
.pdsk
== D_UP_TO_DATE
&& real_peer_disk
== D_INCONSISTENT
&&
4317 os
.conn
== C_CONNECTED
&& peer_state
.conn
> C_SYNC_SOURCE
)
4318 real_peer_disk
= D_UP_TO_DATE
;
4320 if (ns
.conn
== C_WF_REPORT_PARAMS
)
4321 ns
.conn
= C_CONNECTED
;
4323 if (peer_state
.conn
== C_AHEAD
)
4326 if (device
->p_uuid
&& peer_state
.disk
>= D_NEGOTIATING
&&
4327 get_ldev_if_state(device
, D_NEGOTIATING
)) {
4328 int cr
; /* consider resync */
4330 /* if we established a new connection */
4331 cr
= (os
.conn
< C_CONNECTED
);
4332 /* if we had an established connection
4333 * and one of the nodes newly attaches a disk */
4334 cr
|= (os
.conn
== C_CONNECTED
&&
4335 (peer_state
.disk
== D_NEGOTIATING
||
4336 os
.disk
== D_NEGOTIATING
));
4337 /* if we have both been inconsistent, and the peer has been
4338 * forced to be UpToDate with --overwrite-data */
4339 cr
|= test_bit(CONSIDER_RESYNC
, &device
->flags
);
4340 /* if we had been plain connected, and the admin requested to
4341 * start a sync by "invalidate" or "invalidate-remote" */
4342 cr
|= (os
.conn
== C_CONNECTED
&&
4343 (peer_state
.conn
>= C_STARTING_SYNC_S
&&
4344 peer_state
.conn
<= C_WF_BITMAP_T
));
4347 ns
.conn
= drbd_sync_handshake(peer_device
, peer_state
.role
, real_peer_disk
);
4350 if (ns
.conn
== C_MASK
) {
4351 ns
.conn
= C_CONNECTED
;
4352 if (device
->state
.disk
== D_NEGOTIATING
) {
4353 drbd_force_state(device
, NS(disk
, D_FAILED
));
4354 } else if (peer_state
.disk
== D_NEGOTIATING
) {
4355 drbd_err(device
, "Disk attach process on the peer node was aborted.\n");
4356 peer_state
.disk
= D_DISKLESS
;
4357 real_peer_disk
= D_DISKLESS
;
4359 if (test_and_clear_bit(CONN_DRY_RUN
, &peer_device
->connection
->flags
))
4361 D_ASSERT(device
, os
.conn
== C_WF_REPORT_PARAMS
);
4362 conn_request_state(peer_device
->connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
4368 spin_lock_irq(&device
->resource
->req_lock
);
4369 if (os
.i
!= drbd_read_state(device
).i
)
4371 clear_bit(CONSIDER_RESYNC
, &device
->flags
);
4372 ns
.peer
= peer_state
.role
;
4373 ns
.pdsk
= real_peer_disk
;
4374 ns
.peer_isp
= (peer_state
.aftr_isp
| peer_state
.user_isp
);
4375 if ((ns
.conn
== C_CONNECTED
|| ns
.conn
== C_WF_BITMAP_S
) && ns
.disk
== D_NEGOTIATING
)
4376 ns
.disk
= device
->new_state_tmp
.disk
;
4377 cs_flags
= CS_VERBOSE
+ (os
.conn
< C_CONNECTED
&& ns
.conn
>= C_CONNECTED
? 0 : CS_HARD
);
4378 if (ns
.pdsk
== D_CONSISTENT
&& drbd_suspended(device
) && ns
.conn
== C_CONNECTED
&& os
.conn
< C_CONNECTED
&&
4379 test_bit(NEW_CUR_UUID
, &device
->flags
)) {
4380 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4381 for temporal network outages! */
4382 spin_unlock_irq(&device
->resource
->req_lock
);
4383 drbd_err(device
, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4384 tl_clear(peer_device
->connection
);
4385 drbd_uuid_new_current(device
);
4386 clear_bit(NEW_CUR_UUID
, &device
->flags
);
4387 conn_request_state(peer_device
->connection
, NS2(conn
, C_PROTOCOL_ERROR
, susp
, 0), CS_HARD
);
4390 rv
= _drbd_set_state(device
, ns
, cs_flags
, NULL
);
4391 ns
= drbd_read_state(device
);
4392 spin_unlock_irq(&device
->resource
->req_lock
);
4394 if (rv
< SS_SUCCESS
) {
4395 conn_request_state(peer_device
->connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
4399 if (os
.conn
> C_WF_REPORT_PARAMS
) {
4400 if (ns
.conn
> C_CONNECTED
&& peer_state
.conn
<= C_CONNECTED
&&
4401 peer_state
.disk
!= D_NEGOTIATING
) {
4402 /* we want resync, peer has not yet decided to sync... */
4403 /* Nowadays only used when forcing a node into primary role and
4404 setting its disk to UpToDate with that */
4405 drbd_send_uuids(peer_device
);
4406 drbd_send_current_state(peer_device
);
4410 clear_bit(DISCARD_MY_DATA
, &device
->flags
);
4412 drbd_md_sync(device
); /* update connected indicator, la_size_sect, ... */
4417 static int receive_sync_uuid(struct drbd_connection
*connection
, struct packet_info
*pi
)
4419 struct drbd_peer_device
*peer_device
;
4420 struct drbd_device
*device
;
4421 struct p_rs_uuid
*p
= pi
->data
;
4423 peer_device
= conn_peer_device(connection
, pi
->vnr
);
4426 device
= peer_device
->device
;
4428 wait_event(device
->misc_wait
,
4429 device
->state
.conn
== C_WF_SYNC_UUID
||
4430 device
->state
.conn
== C_BEHIND
||
4431 device
->state
.conn
< C_CONNECTED
||
4432 device
->state
.disk
< D_NEGOTIATING
);
4434 /* D_ASSERT(device, device->state.conn == C_WF_SYNC_UUID ); */
4436 /* Here the _drbd_uuid_ functions are right, current should
4437 _not_ be rotated into the history */
4438 if (get_ldev_if_state(device
, D_NEGOTIATING
)) {
4439 _drbd_uuid_set(device
, UI_CURRENT
, be64_to_cpu(p
->uuid
));
4440 _drbd_uuid_set(device
, UI_BITMAP
, 0UL);
4442 drbd_print_uuids(device
, "updated sync uuid");
4443 drbd_start_resync(device
, C_SYNC_TARGET
);
4447 drbd_err(device
, "Ignoring SyncUUID packet!\n");
4453 * receive_bitmap_plain
4455 * Return 0 when done, 1 when another iteration is needed, and a negative error
4456 * code upon failure.
4459 receive_bitmap_plain(struct drbd_peer_device
*peer_device
, unsigned int size
,
4460 unsigned long *p
, struct bm_xfer_ctx
*c
)
4462 unsigned int data_size
= DRBD_SOCKET_BUFFER_SIZE
-
4463 drbd_header_size(peer_device
->connection
);
4464 unsigned int num_words
= min_t(size_t, data_size
/ sizeof(*p
),
4465 c
->bm_words
- c
->word_offset
);
4466 unsigned int want
= num_words
* sizeof(*p
);
4470 drbd_err(peer_device
, "%s:want (%u) != size (%u)\n", __func__
, want
, size
);
4475 err
= drbd_recv_all(peer_device
->connection
, p
, want
);
4479 drbd_bm_merge_lel(peer_device
->device
, c
->word_offset
, num_words
, p
);
4481 c
->word_offset
+= num_words
;
4482 c
->bit_offset
= c
->word_offset
* BITS_PER_LONG
;
4483 if (c
->bit_offset
> c
->bm_bits
)
4484 c
->bit_offset
= c
->bm_bits
;
4489 static enum drbd_bitmap_code
dcbp_get_code(struct p_compressed_bm
*p
)
4491 return (enum drbd_bitmap_code
)(p
->encoding
& 0x0f);
4494 static int dcbp_get_start(struct p_compressed_bm
*p
)
4496 return (p
->encoding
& 0x80) != 0;
4499 static int dcbp_get_pad_bits(struct p_compressed_bm
*p
)
4501 return (p
->encoding
>> 4) & 0x7;
4507 * Return 0 when done, 1 when another iteration is needed, and a negative error
4508 * code upon failure.
4511 recv_bm_rle_bits(struct drbd_peer_device
*peer_device
,
4512 struct p_compressed_bm
*p
,
4513 struct bm_xfer_ctx
*c
,
4516 struct bitstream bs
;
4520 unsigned long s
= c
->bit_offset
;
4522 int toggle
= dcbp_get_start(p
);
4526 bitstream_init(&bs
, p
->code
, len
, dcbp_get_pad_bits(p
));
4528 bits
= bitstream_get_bits(&bs
, &look_ahead
, 64);
4532 for (have
= bits
; have
> 0; s
+= rl
, toggle
= !toggle
) {
4533 bits
= vli_decode_bits(&rl
, look_ahead
);
4539 if (e
>= c
->bm_bits
) {
4540 drbd_err(peer_device
, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e
);
4543 _drbd_bm_set_bits(peer_device
->device
, s
, e
);
4547 drbd_err(peer_device
, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4548 have
, bits
, look_ahead
,
4549 (unsigned int)(bs
.cur
.b
- p
->code
),
4550 (unsigned int)bs
.buf_len
);
4553 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4554 if (likely(bits
< 64))
4555 look_ahead
>>= bits
;
4560 bits
= bitstream_get_bits(&bs
, &tmp
, 64 - have
);
4563 look_ahead
|= tmp
<< have
;
4568 bm_xfer_ctx_bit_to_word_offset(c
);
4570 return (s
!= c
->bm_bits
);
4576 * Return 0 when done, 1 when another iteration is needed, and a negative error
4577 * code upon failure.
4580 decode_bitmap_c(struct drbd_peer_device
*peer_device
,
4581 struct p_compressed_bm
*p
,
4582 struct bm_xfer_ctx
*c
,
4585 if (dcbp_get_code(p
) == RLE_VLI_Bits
)
4586 return recv_bm_rle_bits(peer_device
, p
, c
, len
- sizeof(*p
));
4588 /* other variants had been implemented for evaluation,
4589 * but have been dropped as this one turned out to be "best"
4590 * during all our tests. */
4592 drbd_err(peer_device
, "receive_bitmap_c: unknown encoding %u\n", p
->encoding
);
4593 conn_request_state(peer_device
->connection
, NS(conn
, C_PROTOCOL_ERROR
), CS_HARD
);
4597 void INFO_bm_xfer_stats(struct drbd_device
*device
,
4598 const char *direction
, struct bm_xfer_ctx
*c
)
4600 /* what would it take to transfer it "plaintext" */
4601 unsigned int header_size
= drbd_header_size(first_peer_device(device
)->connection
);
4602 unsigned int data_size
= DRBD_SOCKET_BUFFER_SIZE
- header_size
;
4603 unsigned int plain
=
4604 header_size
* (DIV_ROUND_UP(c
->bm_words
, data_size
) + 1) +
4605 c
->bm_words
* sizeof(unsigned long);
4606 unsigned int total
= c
->bytes
[0] + c
->bytes
[1];
4609 /* total can not be zero. but just in case: */
4613 /* don't report if not compressed */
4617 /* total < plain. check for overflow, still */
4618 r
= (total
> UINT_MAX
/1000) ? (total
/ (plain
/1000))
4619 : (1000 * total
/ plain
);
4625 drbd_info(device
, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4626 "total %u; compression: %u.%u%%\n",
4628 c
->bytes
[1], c
->packets
[1],
4629 c
->bytes
[0], c
->packets
[0],
4630 total
, r
/10, r
% 10);
4633 /* Since we are processing the bitfield from lower addresses to higher,
4634 it does not matter if the process it in 32 bit chunks or 64 bit
4635 chunks as long as it is little endian. (Understand it as byte stream,
4636 beginning with the lowest byte...) If we would use big endian
4637 we would need to process it from the highest address to the lowest,
4638 in order to be agnostic to the 32 vs 64 bits issue.
4640 returns 0 on failure, 1 if we successfully received it. */
4641 static int receive_bitmap(struct drbd_connection
*connection
, struct packet_info
*pi
)
4643 struct drbd_peer_device
*peer_device
;
4644 struct drbd_device
*device
;
4645 struct bm_xfer_ctx c
;
4648 peer_device
= conn_peer_device(connection
, pi
->vnr
);
4651 device
= peer_device
->device
;
4653 drbd_bm_lock(device
, "receive bitmap", BM_LOCKED_SET_ALLOWED
);
4654 /* you are supposed to send additional out-of-sync information
4655 * if you actually set bits during this phase */
4657 c
= (struct bm_xfer_ctx
) {
4658 .bm_bits
= drbd_bm_bits(device
),
4659 .bm_words
= drbd_bm_words(device
),
4663 if (pi
->cmd
== P_BITMAP
)
4664 err
= receive_bitmap_plain(peer_device
, pi
->size
, pi
->data
, &c
);
4665 else if (pi
->cmd
== P_COMPRESSED_BITMAP
) {
4666 /* MAYBE: sanity check that we speak proto >= 90,
4667 * and the feature is enabled! */
4668 struct p_compressed_bm
*p
= pi
->data
;
4670 if (pi
->size
> DRBD_SOCKET_BUFFER_SIZE
- drbd_header_size(connection
)) {
4671 drbd_err(device
, "ReportCBitmap packet too large\n");
4675 if (pi
->size
<= sizeof(*p
)) {
4676 drbd_err(device
, "ReportCBitmap packet too small (l:%u)\n", pi
->size
);
4680 err
= drbd_recv_all(peer_device
->connection
, p
, pi
->size
);
4683 err
= decode_bitmap_c(peer_device
, p
, &c
, pi
->size
);
4685 drbd_warn(device
, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi
->cmd
);
4690 c
.packets
[pi
->cmd
== P_BITMAP
]++;
4691 c
.bytes
[pi
->cmd
== P_BITMAP
] += drbd_header_size(connection
) + pi
->size
;
4698 err
= drbd_recv_header(peer_device
->connection
, pi
);
4703 INFO_bm_xfer_stats(device
, "receive", &c
);
4705 if (device
->state
.conn
== C_WF_BITMAP_T
) {
4706 enum drbd_state_rv rv
;
4708 err
= drbd_send_bitmap(device
);
4711 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4712 rv
= _drbd_request_state(device
, NS(conn
, C_WF_SYNC_UUID
), CS_VERBOSE
);
4713 D_ASSERT(device
, rv
== SS_SUCCESS
);
4714 } else if (device
->state
.conn
!= C_WF_BITMAP_S
) {
4715 /* admin may have requested C_DISCONNECTING,
4716 * other threads may have noticed network errors */
4717 drbd_info(device
, "unexpected cstate (%s) in receive_bitmap\n",
4718 drbd_conn_str(device
->state
.conn
));
4723 drbd_bm_unlock(device
);
4724 if (!err
&& device
->state
.conn
== C_WF_BITMAP_S
)
4725 drbd_start_resync(device
, C_SYNC_SOURCE
);
4729 static int receive_skip(struct drbd_connection
*connection
, struct packet_info
*pi
)
4731 drbd_warn(connection
, "skipping unknown optional packet type %d, l: %d!\n",
4734 return ignore_remaining_packet(connection
, pi
);
4737 static int receive_UnplugRemote(struct drbd_connection
*connection
, struct packet_info
*pi
)
4739 /* Make sure we've acked all the TCP data associated
4740 * with the data requests being unplugged */
4741 drbd_tcp_quickack(connection
->data
.socket
);
4746 static int receive_out_of_sync(struct drbd_connection
*connection
, struct packet_info
*pi
)
4748 struct drbd_peer_device
*peer_device
;
4749 struct drbd_device
*device
;
4750 struct p_block_desc
*p
= pi
->data
;
4752 peer_device
= conn_peer_device(connection
, pi
->vnr
);
4755 device
= peer_device
->device
;
4757 switch (device
->state
.conn
) {
4758 case C_WF_SYNC_UUID
:
4763 drbd_err(device
, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4764 drbd_conn_str(device
->state
.conn
));
4767 drbd_set_out_of_sync(device
, be64_to_cpu(p
->sector
), be32_to_cpu(p
->blksize
));
4772 static int receive_rs_deallocated(struct drbd_connection
*connection
, struct packet_info
*pi
)
4774 struct drbd_peer_device
*peer_device
;
4775 struct p_block_desc
*p
= pi
->data
;
4776 struct drbd_device
*device
;
4780 peer_device
= conn_peer_device(connection
, pi
->vnr
);
4783 device
= peer_device
->device
;
4785 sector
= be64_to_cpu(p
->sector
);
4786 size
= be32_to_cpu(p
->blksize
);
4788 dec_rs_pending(device
);
4790 if (get_ldev(device
)) {
4791 struct drbd_peer_request
*peer_req
;
4792 const int op
= REQ_OP_WRITE_ZEROES
;
4794 peer_req
= drbd_alloc_peer_req(peer_device
, ID_SYNCER
, sector
,
4801 peer_req
->w
.cb
= e_end_resync_block
;
4802 peer_req
->submit_jif
= jiffies
;
4803 peer_req
->flags
|= EE_IS_TRIM
;
4805 spin_lock_irq(&device
->resource
->req_lock
);
4806 list_add_tail(&peer_req
->w
.list
, &device
->sync_ee
);
4807 spin_unlock_irq(&device
->resource
->req_lock
);
4809 atomic_add(pi
->size
>> 9, &device
->rs_sect_ev
);
4810 err
= drbd_submit_peer_request(device
, peer_req
, op
, 0, DRBD_FAULT_RS_WR
);
4813 spin_lock_irq(&device
->resource
->req_lock
);
4814 list_del(&peer_req
->w
.list
);
4815 spin_unlock_irq(&device
->resource
->req_lock
);
4817 drbd_free_peer_req(device
, peer_req
);
4823 inc_unacked(device
);
4825 /* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
4826 as well as drbd_rs_complete_io() */
4829 drbd_rs_complete_io(device
, sector
);
4830 drbd_send_ack_ex(peer_device
, P_NEG_ACK
, sector
, size
, ID_SYNCER
);
4833 atomic_add(size
>> 9, &device
->rs_sect_in
);
4840 unsigned int pkt_size
;
4841 int (*fn
)(struct drbd_connection
*, struct packet_info
*);
4844 static struct data_cmd drbd_cmd_handler
[] = {
4845 [P_DATA
] = { 1, sizeof(struct p_data
), receive_Data
},
4846 [P_DATA_REPLY
] = { 1, sizeof(struct p_data
), receive_DataReply
},
4847 [P_RS_DATA_REPLY
] = { 1, sizeof(struct p_data
), receive_RSDataReply
} ,
4848 [P_BARRIER
] = { 0, sizeof(struct p_barrier
), receive_Barrier
} ,
4849 [P_BITMAP
] = { 1, 0, receive_bitmap
} ,
4850 [P_COMPRESSED_BITMAP
] = { 1, 0, receive_bitmap
} ,
4851 [P_UNPLUG_REMOTE
] = { 0, 0, receive_UnplugRemote
},
4852 [P_DATA_REQUEST
] = { 0, sizeof(struct p_block_req
), receive_DataRequest
},
4853 [P_RS_DATA_REQUEST
] = { 0, sizeof(struct p_block_req
), receive_DataRequest
},
4854 [P_SYNC_PARAM
] = { 1, 0, receive_SyncParam
},
4855 [P_SYNC_PARAM89
] = { 1, 0, receive_SyncParam
},
4856 [P_PROTOCOL
] = { 1, sizeof(struct p_protocol
), receive_protocol
},
4857 [P_UUIDS
] = { 0, sizeof(struct p_uuids
), receive_uuids
},
4858 [P_SIZES
] = { 0, sizeof(struct p_sizes
), receive_sizes
},
4859 [P_STATE
] = { 0, sizeof(struct p_state
), receive_state
},
4860 [P_STATE_CHG_REQ
] = { 0, sizeof(struct p_req_state
), receive_req_state
},
4861 [P_SYNC_UUID
] = { 0, sizeof(struct p_rs_uuid
), receive_sync_uuid
},
4862 [P_OV_REQUEST
] = { 0, sizeof(struct p_block_req
), receive_DataRequest
},
4863 [P_OV_REPLY
] = { 1, sizeof(struct p_block_req
), receive_DataRequest
},
4864 [P_CSUM_RS_REQUEST
] = { 1, sizeof(struct p_block_req
), receive_DataRequest
},
4865 [P_RS_THIN_REQ
] = { 0, sizeof(struct p_block_req
), receive_DataRequest
},
4866 [P_DELAY_PROBE
] = { 0, sizeof(struct p_delay_probe93
), receive_skip
},
4867 [P_OUT_OF_SYNC
] = { 0, sizeof(struct p_block_desc
), receive_out_of_sync
},
4868 [P_CONN_ST_CHG_REQ
] = { 0, sizeof(struct p_req_state
), receive_req_conn_state
},
4869 [P_PROTOCOL_UPDATE
] = { 1, sizeof(struct p_protocol
), receive_protocol
},
4870 [P_TRIM
] = { 0, sizeof(struct p_trim
), receive_Data
},
4871 [P_RS_DEALLOCATED
] = { 0, sizeof(struct p_block_desc
), receive_rs_deallocated
},
4872 [P_WSAME
] = { 1, sizeof(struct p_wsame
), receive_Data
},
4875 static void drbdd(struct drbd_connection
*connection
)
4877 struct packet_info pi
;
4878 size_t shs
; /* sub header size */
4881 while (get_t_state(&connection
->receiver
) == RUNNING
) {
4882 struct data_cmd
const *cmd
;
4884 drbd_thread_current_set_cpu(&connection
->receiver
);
4885 update_receiver_timing_details(connection
, drbd_recv_header
);
4886 if (drbd_recv_header(connection
, &pi
))
4889 cmd
= &drbd_cmd_handler
[pi
.cmd
];
4890 if (unlikely(pi
.cmd
>= ARRAY_SIZE(drbd_cmd_handler
) || !cmd
->fn
)) {
4891 drbd_err(connection
, "Unexpected data packet %s (0x%04x)",
4892 cmdname(pi
.cmd
), pi
.cmd
);
4896 shs
= cmd
->pkt_size
;
4897 if (pi
.cmd
== P_SIZES
&& connection
->agreed_features
& DRBD_FF_WSAME
)
4898 shs
+= sizeof(struct o_qlim
);
4899 if (pi
.size
> shs
&& !cmd
->expect_payload
) {
4900 drbd_err(connection
, "No payload expected %s l:%d\n",
4901 cmdname(pi
.cmd
), pi
.size
);
4904 if (pi
.size
< shs
) {
4905 drbd_err(connection
, "%s: unexpected packet size, expected:%d received:%d\n",
4906 cmdname(pi
.cmd
), (int)shs
, pi
.size
);
4911 update_receiver_timing_details(connection
, drbd_recv_all_warn
);
4912 err
= drbd_recv_all_warn(connection
, pi
.data
, shs
);
4918 update_receiver_timing_details(connection
, cmd
->fn
);
4919 err
= cmd
->fn(connection
, &pi
);
4921 drbd_err(connection
, "error receiving %s, e: %d l: %d!\n",
4922 cmdname(pi
.cmd
), err
, pi
.size
);
4929 conn_request_state(connection
, NS(conn
, C_PROTOCOL_ERROR
), CS_HARD
);
4932 static void conn_disconnect(struct drbd_connection
*connection
)
4934 struct drbd_peer_device
*peer_device
;
4938 if (connection
->cstate
== C_STANDALONE
)
4941 /* We are about to start the cleanup after connection loss.
4942 * Make sure drbd_make_request knows about that.
4943 * Usually we should be in some network failure state already,
4944 * but just in case we are not, we fix it up here.
4946 conn_request_state(connection
, NS(conn
, C_NETWORK_FAILURE
), CS_HARD
);
4948 /* ack_receiver does not clean up anything. it must not interfere, either */
4949 drbd_thread_stop(&connection
->ack_receiver
);
4950 if (connection
->ack_sender
) {
4951 destroy_workqueue(connection
->ack_sender
);
4952 connection
->ack_sender
= NULL
;
4954 drbd_free_sock(connection
);
4957 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
) {
4958 struct drbd_device
*device
= peer_device
->device
;
4959 kref_get(&device
->kref
);
4961 drbd_disconnected(peer_device
);
4962 kref_put(&device
->kref
, drbd_destroy_device
);
4967 if (!list_empty(&connection
->current_epoch
->list
))
4968 drbd_err(connection
, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4969 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4970 atomic_set(&connection
->current_epoch
->epoch_size
, 0);
4971 connection
->send
.seen_any_write_yet
= false;
4973 drbd_info(connection
, "Connection closed\n");
4975 if (conn_highest_role(connection
) == R_PRIMARY
&& conn_highest_pdsk(connection
) >= D_UNKNOWN
)
4976 conn_try_outdate_peer_async(connection
);
4978 spin_lock_irq(&connection
->resource
->req_lock
);
4979 oc
= connection
->cstate
;
4980 if (oc
>= C_UNCONNECTED
)
4981 _conn_request_state(connection
, NS(conn
, C_UNCONNECTED
), CS_VERBOSE
);
4983 spin_unlock_irq(&connection
->resource
->req_lock
);
4985 if (oc
== C_DISCONNECTING
)
4986 conn_request_state(connection
, NS(conn
, C_STANDALONE
), CS_VERBOSE
| CS_HARD
);
4989 static int drbd_disconnected(struct drbd_peer_device
*peer_device
)
4991 struct drbd_device
*device
= peer_device
->device
;
4994 /* wait for current activity to cease. */
4995 spin_lock_irq(&device
->resource
->req_lock
);
4996 _drbd_wait_ee_list_empty(device
, &device
->active_ee
);
4997 _drbd_wait_ee_list_empty(device
, &device
->sync_ee
);
4998 _drbd_wait_ee_list_empty(device
, &device
->read_ee
);
4999 spin_unlock_irq(&device
->resource
->req_lock
);
5001 /* We do not have data structures that would allow us to
5002 * get the rs_pending_cnt down to 0 again.
5003 * * On C_SYNC_TARGET we do not have any data structures describing
5004 * the pending RSDataRequest's we have sent.
5005 * * On C_SYNC_SOURCE there is no data structure that tracks
5006 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5007 * And no, it is not the sum of the reference counts in the
5008 * resync_LRU. The resync_LRU tracks the whole operation including
5009 * the disk-IO, while the rs_pending_cnt only tracks the blocks
5011 drbd_rs_cancel_all(device
);
5012 device
->rs_total
= 0;
5013 device
->rs_failed
= 0;
5014 atomic_set(&device
->rs_pending_cnt
, 0);
5015 wake_up(&device
->misc_wait
);
5017 del_timer_sync(&device
->resync_timer
);
5018 resync_timer_fn((unsigned long)device
);
5020 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5021 * w_make_resync_request etc. which may still be on the worker queue
5022 * to be "canceled" */
5023 drbd_flush_workqueue(&peer_device
->connection
->sender_work
);
5025 drbd_finish_peer_reqs(device
);
5027 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5028 might have issued a work again. The one before drbd_finish_peer_reqs() is
5029 necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5030 drbd_flush_workqueue(&peer_device
->connection
->sender_work
);
5032 /* need to do it again, drbd_finish_peer_reqs() may have populated it
5033 * again via drbd_try_clear_on_disk_bm(). */
5034 drbd_rs_cancel_all(device
);
5036 kfree(device
->p_uuid
);
5037 device
->p_uuid
= NULL
;
5039 if (!drbd_suspended(device
))
5040 tl_clear(peer_device
->connection
);
5042 drbd_md_sync(device
);
5044 if (get_ldev(device
)) {
5045 drbd_bitmap_io(device
, &drbd_bm_write_copy_pages
,
5046 "write from disconnected", BM_LOCKED_CHANGE_ALLOWED
);
5050 /* tcp_close and release of sendpage pages can be deferred. I don't
5051 * want to use SO_LINGER, because apparently it can be deferred for
5052 * more than 20 seconds (longest time I checked).
5054 * Actually we don't care for exactly when the network stack does its
5055 * put_page(), but release our reference on these pages right here.
5057 i
= drbd_free_peer_reqs(device
, &device
->net_ee
);
5059 drbd_info(device
, "net_ee not empty, killed %u entries\n", i
);
5060 i
= atomic_read(&device
->pp_in_use_by_net
);
5062 drbd_info(device
, "pp_in_use_by_net = %d, expected 0\n", i
);
5063 i
= atomic_read(&device
->pp_in_use
);
5065 drbd_info(device
, "pp_in_use = %d, expected 0\n", i
);
5067 D_ASSERT(device
, list_empty(&device
->read_ee
));
5068 D_ASSERT(device
, list_empty(&device
->active_ee
));
5069 D_ASSERT(device
, list_empty(&device
->sync_ee
));
5070 D_ASSERT(device
, list_empty(&device
->done_ee
));
5076 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5077 * we can agree on is stored in agreed_pro_version.
5079 * feature flags and the reserved array should be enough room for future
5080 * enhancements of the handshake protocol, and possible plugins...
5082 * for now, they are expected to be zero, but ignored.
5084 static int drbd_send_features(struct drbd_connection
*connection
)
5086 struct drbd_socket
*sock
;
5087 struct p_connection_features
*p
;
5089 sock
= &connection
->data
;
5090 p
= conn_prepare_command(connection
, sock
);
5093 memset(p
, 0, sizeof(*p
));
5094 p
->protocol_min
= cpu_to_be32(PRO_VERSION_MIN
);
5095 p
->protocol_max
= cpu_to_be32(PRO_VERSION_MAX
);
5096 p
->feature_flags
= cpu_to_be32(PRO_FEATURES
);
5097 return conn_send_command(connection
, sock
, P_CONNECTION_FEATURES
, sizeof(*p
), NULL
, 0);
5102 * 1 yes, we have a valid connection
5103 * 0 oops, did not work out, please try again
5104 * -1 peer talks different language,
5105 * no point in trying again, please go standalone.
5107 static int drbd_do_features(struct drbd_connection
*connection
)
5109 /* ASSERT current == connection->receiver ... */
5110 struct p_connection_features
*p
;
5111 const int expect
= sizeof(struct p_connection_features
);
5112 struct packet_info pi
;
5115 err
= drbd_send_features(connection
);
5119 err
= drbd_recv_header(connection
, &pi
);
5123 if (pi
.cmd
!= P_CONNECTION_FEATURES
) {
5124 drbd_err(connection
, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5125 cmdname(pi
.cmd
), pi
.cmd
);
5129 if (pi
.size
!= expect
) {
5130 drbd_err(connection
, "expected ConnectionFeatures length: %u, received: %u\n",
5136 err
= drbd_recv_all_warn(connection
, p
, expect
);
5140 p
->protocol_min
= be32_to_cpu(p
->protocol_min
);
5141 p
->protocol_max
= be32_to_cpu(p
->protocol_max
);
5142 if (p
->protocol_max
== 0)
5143 p
->protocol_max
= p
->protocol_min
;
5145 if (PRO_VERSION_MAX
< p
->protocol_min
||
5146 PRO_VERSION_MIN
> p
->protocol_max
)
5149 connection
->agreed_pro_version
= min_t(int, PRO_VERSION_MAX
, p
->protocol_max
);
5150 connection
->agreed_features
= PRO_FEATURES
& be32_to_cpu(p
->feature_flags
);
5152 drbd_info(connection
, "Handshake successful: "
5153 "Agreed network protocol version %d\n", connection
->agreed_pro_version
);
5155 drbd_info(connection
, "Feature flags enabled on protocol level: 0x%x%s%s%s.\n",
5156 connection
->agreed_features
,
5157 connection
->agreed_features
& DRBD_FF_TRIM
? " TRIM" : "",
5158 connection
->agreed_features
& DRBD_FF_THIN_RESYNC
? " THIN_RESYNC" : "",
5159 connection
->agreed_features
& DRBD_FF_WSAME
? " WRITE_SAME" :
5160 connection
->agreed_features
? "" : " none");
5165 drbd_err(connection
, "incompatible DRBD dialects: "
5166 "I support %d-%d, peer supports %d-%d\n",
5167 PRO_VERSION_MIN
, PRO_VERSION_MAX
,
5168 p
->protocol_min
, p
->protocol_max
);
5172 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5173 static int drbd_do_auth(struct drbd_connection
*connection
)
5175 drbd_err(connection
, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5176 drbd_err(connection
, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5180 #define CHALLENGE_LEN 64
5184 0 - failed, try again (network error),
5185 -1 - auth failed, don't try again.
5188 static int drbd_do_auth(struct drbd_connection
*connection
)
5190 struct drbd_socket
*sock
;
5191 char my_challenge
[CHALLENGE_LEN
]; /* 64 Bytes... */
5192 char *response
= NULL
;
5193 char *right_response
= NULL
;
5194 char *peers_ch
= NULL
;
5195 unsigned int key_len
;
5196 char secret
[SHARED_SECRET_MAX
]; /* 64 byte */
5197 unsigned int resp_size
;
5198 SHASH_DESC_ON_STACK(desc
, connection
->cram_hmac_tfm
);
5199 struct packet_info pi
;
5200 struct net_conf
*nc
;
5203 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
5206 nc
= rcu_dereference(connection
->net_conf
);
5207 key_len
= strlen(nc
->shared_secret
);
5208 memcpy(secret
, nc
->shared_secret
, key_len
);
5211 desc
->tfm
= connection
->cram_hmac_tfm
;
5214 rv
= crypto_shash_setkey(connection
->cram_hmac_tfm
, (u8
*)secret
, key_len
);
5216 drbd_err(connection
, "crypto_shash_setkey() failed with %d\n", rv
);
5221 get_random_bytes(my_challenge
, CHALLENGE_LEN
);
5223 sock
= &connection
->data
;
5224 if (!conn_prepare_command(connection
, sock
)) {
5228 rv
= !conn_send_command(connection
, sock
, P_AUTH_CHALLENGE
, 0,
5229 my_challenge
, CHALLENGE_LEN
);
5233 err
= drbd_recv_header(connection
, &pi
);
5239 if (pi
.cmd
!= P_AUTH_CHALLENGE
) {
5240 drbd_err(connection
, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5241 cmdname(pi
.cmd
), pi
.cmd
);
5246 if (pi
.size
> CHALLENGE_LEN
* 2) {
5247 drbd_err(connection
, "expected AuthChallenge payload too big.\n");
5252 if (pi
.size
< CHALLENGE_LEN
) {
5253 drbd_err(connection
, "AuthChallenge payload too small.\n");
5258 peers_ch
= kmalloc(pi
.size
, GFP_NOIO
);
5259 if (peers_ch
== NULL
) {
5260 drbd_err(connection
, "kmalloc of peers_ch failed\n");
5265 err
= drbd_recv_all_warn(connection
, peers_ch
, pi
.size
);
5271 if (!memcmp(my_challenge
, peers_ch
, CHALLENGE_LEN
)) {
5272 drbd_err(connection
, "Peer presented the same challenge!\n");
5277 resp_size
= crypto_shash_digestsize(connection
->cram_hmac_tfm
);
5278 response
= kmalloc(resp_size
, GFP_NOIO
);
5279 if (response
== NULL
) {
5280 drbd_err(connection
, "kmalloc of response failed\n");
5285 rv
= crypto_shash_digest(desc
, peers_ch
, pi
.size
, response
);
5287 drbd_err(connection
, "crypto_hash_digest() failed with %d\n", rv
);
5292 if (!conn_prepare_command(connection
, sock
)) {
5296 rv
= !conn_send_command(connection
, sock
, P_AUTH_RESPONSE
, 0,
5297 response
, resp_size
);
5301 err
= drbd_recv_header(connection
, &pi
);
5307 if (pi
.cmd
!= P_AUTH_RESPONSE
) {
5308 drbd_err(connection
, "expected AuthResponse packet, received: %s (0x%04x)\n",
5309 cmdname(pi
.cmd
), pi
.cmd
);
5314 if (pi
.size
!= resp_size
) {
5315 drbd_err(connection
, "expected AuthResponse payload of wrong size\n");
5320 err
= drbd_recv_all_warn(connection
, response
, resp_size
);
5326 right_response
= kmalloc(resp_size
, GFP_NOIO
);
5327 if (right_response
== NULL
) {
5328 drbd_err(connection
, "kmalloc of right_response failed\n");
5333 rv
= crypto_shash_digest(desc
, my_challenge
, CHALLENGE_LEN
,
5336 drbd_err(connection
, "crypto_hash_digest() failed with %d\n", rv
);
5341 rv
= !memcmp(response
, right_response
, resp_size
);
5344 drbd_info(connection
, "Peer authenticated using %d bytes HMAC\n",
5352 kfree(right_response
);
5353 shash_desc_zero(desc
);
5359 int drbd_receiver(struct drbd_thread
*thi
)
5361 struct drbd_connection
*connection
= thi
->connection
;
5364 drbd_info(connection
, "receiver (re)started\n");
5367 h
= conn_connect(connection
);
5369 conn_disconnect(connection
);
5370 schedule_timeout_interruptible(HZ
);
5373 drbd_warn(connection
, "Discarding network configuration.\n");
5374 conn_request_state(connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
5381 conn_disconnect(connection
);
5383 drbd_info(connection
, "receiver terminated\n");
5387 /* ********* acknowledge sender ******** */
5389 static int got_conn_RqSReply(struct drbd_connection
*connection
, struct packet_info
*pi
)
5391 struct p_req_state_reply
*p
= pi
->data
;
5392 int retcode
= be32_to_cpu(p
->retcode
);
5394 if (retcode
>= SS_SUCCESS
) {
5395 set_bit(CONN_WD_ST_CHG_OKAY
, &connection
->flags
);
5397 set_bit(CONN_WD_ST_CHG_FAIL
, &connection
->flags
);
5398 drbd_err(connection
, "Requested state change failed by peer: %s (%d)\n",
5399 drbd_set_st_err_str(retcode
), retcode
);
5401 wake_up(&connection
->ping_wait
);
5406 static int got_RqSReply(struct drbd_connection
*connection
, struct packet_info
*pi
)
5408 struct drbd_peer_device
*peer_device
;
5409 struct drbd_device
*device
;
5410 struct p_req_state_reply
*p
= pi
->data
;
5411 int retcode
= be32_to_cpu(p
->retcode
);
5413 peer_device
= conn_peer_device(connection
, pi
->vnr
);
5416 device
= peer_device
->device
;
5418 if (test_bit(CONN_WD_ST_CHG_REQ
, &connection
->flags
)) {
5419 D_ASSERT(device
, connection
->agreed_pro_version
< 100);
5420 return got_conn_RqSReply(connection
, pi
);
5423 if (retcode
>= SS_SUCCESS
) {
5424 set_bit(CL_ST_CHG_SUCCESS
, &device
->flags
);
5426 set_bit(CL_ST_CHG_FAIL
, &device
->flags
);
5427 drbd_err(device
, "Requested state change failed by peer: %s (%d)\n",
5428 drbd_set_st_err_str(retcode
), retcode
);
5430 wake_up(&device
->state_wait
);
5435 static int got_Ping(struct drbd_connection
*connection
, struct packet_info
*pi
)
5437 return drbd_send_ping_ack(connection
);
5441 static int got_PingAck(struct drbd_connection
*connection
, struct packet_info
*pi
)
5443 /* restore idle timeout */
5444 connection
->meta
.socket
->sk
->sk_rcvtimeo
= connection
->net_conf
->ping_int
*HZ
;
5445 if (!test_and_set_bit(GOT_PING_ACK
, &connection
->flags
))
5446 wake_up(&connection
->ping_wait
);
5451 static int got_IsInSync(struct drbd_connection
*connection
, struct packet_info
*pi
)
5453 struct drbd_peer_device
*peer_device
;
5454 struct drbd_device
*device
;
5455 struct p_block_ack
*p
= pi
->data
;
5456 sector_t sector
= be64_to_cpu(p
->sector
);
5457 int blksize
= be32_to_cpu(p
->blksize
);
5459 peer_device
= conn_peer_device(connection
, pi
->vnr
);
5462 device
= peer_device
->device
;
5464 D_ASSERT(device
, peer_device
->connection
->agreed_pro_version
>= 89);
5466 update_peer_seq(peer_device
, be32_to_cpu(p
->seq_num
));
5468 if (get_ldev(device
)) {
5469 drbd_rs_complete_io(device
, sector
);
5470 drbd_set_in_sync(device
, sector
, blksize
);
5471 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5472 device
->rs_same_csum
+= (blksize
>> BM_BLOCK_SHIFT
);
5475 dec_rs_pending(device
);
5476 atomic_add(blksize
>> 9, &device
->rs_sect_in
);
5482 validate_req_change_req_state(struct drbd_device
*device
, u64 id
, sector_t sector
,
5483 struct rb_root
*root
, const char *func
,
5484 enum drbd_req_event what
, bool missing_ok
)
5486 struct drbd_request
*req
;
5487 struct bio_and_error m
;
5489 spin_lock_irq(&device
->resource
->req_lock
);
5490 req
= find_request(device
, root
, id
, sector
, missing_ok
, func
);
5491 if (unlikely(!req
)) {
5492 spin_unlock_irq(&device
->resource
->req_lock
);
5495 __req_mod(req
, what
, &m
);
5496 spin_unlock_irq(&device
->resource
->req_lock
);
5499 complete_master_bio(device
, &m
);
5503 static int got_BlockAck(struct drbd_connection
*connection
, struct packet_info
*pi
)
5505 struct drbd_peer_device
*peer_device
;
5506 struct drbd_device
*device
;
5507 struct p_block_ack
*p
= pi
->data
;
5508 sector_t sector
= be64_to_cpu(p
->sector
);
5509 int blksize
= be32_to_cpu(p
->blksize
);
5510 enum drbd_req_event what
;
5512 peer_device
= conn_peer_device(connection
, pi
->vnr
);
5515 device
= peer_device
->device
;
5517 update_peer_seq(peer_device
, be32_to_cpu(p
->seq_num
));
5519 if (p
->block_id
== ID_SYNCER
) {
5520 drbd_set_in_sync(device
, sector
, blksize
);
5521 dec_rs_pending(device
);
5525 case P_RS_WRITE_ACK
:
5526 what
= WRITE_ACKED_BY_PEER_AND_SIS
;
5529 what
= WRITE_ACKED_BY_PEER
;
5532 what
= RECV_ACKED_BY_PEER
;
5535 what
= CONFLICT_RESOLVED
;
5538 what
= POSTPONE_WRITE
;
5544 return validate_req_change_req_state(device
, p
->block_id
, sector
,
5545 &device
->write_requests
, __func__
,
5549 static int got_NegAck(struct drbd_connection
*connection
, struct packet_info
*pi
)
5551 struct drbd_peer_device
*peer_device
;
5552 struct drbd_device
*device
;
5553 struct p_block_ack
*p
= pi
->data
;
5554 sector_t sector
= be64_to_cpu(p
->sector
);
5555 int size
= be32_to_cpu(p
->blksize
);
5558 peer_device
= conn_peer_device(connection
, pi
->vnr
);
5561 device
= peer_device
->device
;
5563 update_peer_seq(peer_device
, be32_to_cpu(p
->seq_num
));
5565 if (p
->block_id
== ID_SYNCER
) {
5566 dec_rs_pending(device
);
5567 drbd_rs_failed_io(device
, sector
, size
);
5571 err
= validate_req_change_req_state(device
, p
->block_id
, sector
,
5572 &device
->write_requests
, __func__
,
5575 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5576 The master bio might already be completed, therefore the
5577 request is no longer in the collision hash. */
5578 /* In Protocol B we might already have got a P_RECV_ACK
5579 but then get a P_NEG_ACK afterwards. */
5580 drbd_set_out_of_sync(device
, sector
, size
);
5585 static int got_NegDReply(struct drbd_connection
*connection
, struct packet_info
*pi
)
5587 struct drbd_peer_device
*peer_device
;
5588 struct drbd_device
*device
;
5589 struct p_block_ack
*p
= pi
->data
;
5590 sector_t sector
= be64_to_cpu(p
->sector
);
5592 peer_device
= conn_peer_device(connection
, pi
->vnr
);
5595 device
= peer_device
->device
;
5597 update_peer_seq(peer_device
, be32_to_cpu(p
->seq_num
));
5599 drbd_err(device
, "Got NegDReply; Sector %llus, len %u.\n",
5600 (unsigned long long)sector
, be32_to_cpu(p
->blksize
));
5602 return validate_req_change_req_state(device
, p
->block_id
, sector
,
5603 &device
->read_requests
, __func__
,
5607 static int got_NegRSDReply(struct drbd_connection
*connection
, struct packet_info
*pi
)
5609 struct drbd_peer_device
*peer_device
;
5610 struct drbd_device
*device
;
5613 struct p_block_ack
*p
= pi
->data
;
5615 peer_device
= conn_peer_device(connection
, pi
->vnr
);
5618 device
= peer_device
->device
;
5620 sector
= be64_to_cpu(p
->sector
);
5621 size
= be32_to_cpu(p
->blksize
);
5623 update_peer_seq(peer_device
, be32_to_cpu(p
->seq_num
));
5625 dec_rs_pending(device
);
5627 if (get_ldev_if_state(device
, D_FAILED
)) {
5628 drbd_rs_complete_io(device
, sector
);
5630 case P_NEG_RS_DREPLY
:
5631 drbd_rs_failed_io(device
, sector
, size
);
5643 static int got_BarrierAck(struct drbd_connection
*connection
, struct packet_info
*pi
)
5645 struct p_barrier_ack
*p
= pi
->data
;
5646 struct drbd_peer_device
*peer_device
;
5649 tl_release(connection
, p
->barrier
, be32_to_cpu(p
->set_size
));
5652 idr_for_each_entry(&connection
->peer_devices
, peer_device
, vnr
) {
5653 struct drbd_device
*device
= peer_device
->device
;
5655 if (device
->state
.conn
== C_AHEAD
&&
5656 atomic_read(&device
->ap_in_flight
) == 0 &&
5657 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE
, &device
->flags
)) {
5658 device
->start_resync_timer
.expires
= jiffies
+ HZ
;
5659 add_timer(&device
->start_resync_timer
);
5667 static int got_OVResult(struct drbd_connection
*connection
, struct packet_info
*pi
)
5669 struct drbd_peer_device
*peer_device
;
5670 struct drbd_device
*device
;
5671 struct p_block_ack
*p
= pi
->data
;
5672 struct drbd_device_work
*dw
;
5676 peer_device
= conn_peer_device(connection
, pi
->vnr
);
5679 device
= peer_device
->device
;
5681 sector
= be64_to_cpu(p
->sector
);
5682 size
= be32_to_cpu(p
->blksize
);
5684 update_peer_seq(peer_device
, be32_to_cpu(p
->seq_num
));
5686 if (be64_to_cpu(p
->block_id
) == ID_OUT_OF_SYNC
)
5687 drbd_ov_out_of_sync_found(device
, sector
, size
);
5689 ov_out_of_sync_print(device
);
5691 if (!get_ldev(device
))
5694 drbd_rs_complete_io(device
, sector
);
5695 dec_rs_pending(device
);
5699 /* let's advance progress step marks only for every other megabyte */
5700 if ((device
->ov_left
& 0x200) == 0x200)
5701 drbd_advance_rs_marks(device
, device
->ov_left
);
5703 if (device
->ov_left
== 0) {
5704 dw
= kmalloc(sizeof(*dw
), GFP_NOIO
);
5706 dw
->w
.cb
= w_ov_finished
;
5707 dw
->device
= device
;
5708 drbd_queue_work(&peer_device
->connection
->sender_work
, &dw
->w
);
5710 drbd_err(device
, "kmalloc(dw) failed.");
5711 ov_out_of_sync_print(device
);
5712 drbd_resync_finished(device
);
5719 static int got_skip(struct drbd_connection
*connection
, struct packet_info
*pi
)
5724 struct meta_sock_cmd
{
5726 int (*fn
)(struct drbd_connection
*connection
, struct packet_info
*);
5729 static void set_rcvtimeo(struct drbd_connection
*connection
, bool ping_timeout
)
5732 struct net_conf
*nc
;
5735 nc
= rcu_dereference(connection
->net_conf
);
5736 t
= ping_timeout
? nc
->ping_timeo
: nc
->ping_int
;
5743 connection
->meta
.socket
->sk
->sk_rcvtimeo
= t
;
5746 static void set_ping_timeout(struct drbd_connection
*connection
)
5748 set_rcvtimeo(connection
, 1);
5751 static void set_idle_timeout(struct drbd_connection
*connection
)
5753 set_rcvtimeo(connection
, 0);
5756 static struct meta_sock_cmd ack_receiver_tbl
[] = {
5757 [P_PING
] = { 0, got_Ping
},
5758 [P_PING_ACK
] = { 0, got_PingAck
},
5759 [P_RECV_ACK
] = { sizeof(struct p_block_ack
), got_BlockAck
},
5760 [P_WRITE_ACK
] = { sizeof(struct p_block_ack
), got_BlockAck
},
5761 [P_RS_WRITE_ACK
] = { sizeof(struct p_block_ack
), got_BlockAck
},
5762 [P_SUPERSEDED
] = { sizeof(struct p_block_ack
), got_BlockAck
},
5763 [P_NEG_ACK
] = { sizeof(struct p_block_ack
), got_NegAck
},
5764 [P_NEG_DREPLY
] = { sizeof(struct p_block_ack
), got_NegDReply
},
5765 [P_NEG_RS_DREPLY
] = { sizeof(struct p_block_ack
), got_NegRSDReply
},
5766 [P_OV_RESULT
] = { sizeof(struct p_block_ack
), got_OVResult
},
5767 [P_BARRIER_ACK
] = { sizeof(struct p_barrier_ack
), got_BarrierAck
},
5768 [P_STATE_CHG_REPLY
] = { sizeof(struct p_req_state_reply
), got_RqSReply
},
5769 [P_RS_IS_IN_SYNC
] = { sizeof(struct p_block_ack
), got_IsInSync
},
5770 [P_DELAY_PROBE
] = { sizeof(struct p_delay_probe93
), got_skip
},
5771 [P_RS_CANCEL
] = { sizeof(struct p_block_ack
), got_NegRSDReply
},
5772 [P_CONN_ST_CHG_REPLY
]={ sizeof(struct p_req_state_reply
), got_conn_RqSReply
},
5773 [P_RETRY_WRITE
] = { sizeof(struct p_block_ack
), got_BlockAck
},
5776 int drbd_ack_receiver(struct drbd_thread
*thi
)
5778 struct drbd_connection
*connection
= thi
->connection
;
5779 struct meta_sock_cmd
*cmd
= NULL
;
5780 struct packet_info pi
;
5781 unsigned long pre_recv_jif
;
5783 void *buf
= connection
->meta
.rbuf
;
5785 unsigned int header_size
= drbd_header_size(connection
);
5786 int expect
= header_size
;
5787 bool ping_timeout_active
= false;
5788 struct sched_param param
= { .sched_priority
= 2 };
5790 rv
= sched_setscheduler(current
, SCHED_RR
, ¶m
);
5792 drbd_err(connection
, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv
);
5794 while (get_t_state(thi
) == RUNNING
) {
5795 drbd_thread_current_set_cpu(thi
);
5797 conn_reclaim_net_peer_reqs(connection
);
5799 if (test_and_clear_bit(SEND_PING
, &connection
->flags
)) {
5800 if (drbd_send_ping(connection
)) {
5801 drbd_err(connection
, "drbd_send_ping has failed\n");
5804 set_ping_timeout(connection
);
5805 ping_timeout_active
= true;
5808 pre_recv_jif
= jiffies
;
5809 rv
= drbd_recv_short(connection
->meta
.socket
, buf
, expect
-received
, 0);
5812 * -EINTR (on meta) we got a signal
5813 * -EAGAIN (on meta) rcvtimeo expired
5814 * -ECONNRESET other side closed the connection
5815 * -ERESTARTSYS (on data) we got a signal
5816 * rv < 0 other than above: unexpected error!
5817 * rv == expected: full header or command
5818 * rv < expected: "woken" by signal during receive
5819 * rv == 0 : "connection shut down by peer"
5821 if (likely(rv
> 0)) {
5824 } else if (rv
== 0) {
5825 if (test_bit(DISCONNECT_SENT
, &connection
->flags
)) {
5828 t
= rcu_dereference(connection
->net_conf
)->ping_timeo
* HZ
/10;
5831 t
= wait_event_timeout(connection
->ping_wait
,
5832 connection
->cstate
< C_WF_REPORT_PARAMS
,
5837 drbd_err(connection
, "meta connection shut down by peer.\n");
5839 } else if (rv
== -EAGAIN
) {
5840 /* If the data socket received something meanwhile,
5841 * that is good enough: peer is still alive. */
5842 if (time_after(connection
->last_received
, pre_recv_jif
))
5844 if (ping_timeout_active
) {
5845 drbd_err(connection
, "PingAck did not arrive in time.\n");
5848 set_bit(SEND_PING
, &connection
->flags
);
5850 } else if (rv
== -EINTR
) {
5851 /* maybe drbd_thread_stop(): the while condition will notice.
5852 * maybe woken for send_ping: we'll send a ping above,
5853 * and change the rcvtimeo */
5854 flush_signals(current
);
5857 drbd_err(connection
, "sock_recvmsg returned %d\n", rv
);
5861 if (received
== expect
&& cmd
== NULL
) {
5862 if (decode_header(connection
, connection
->meta
.rbuf
, &pi
))
5864 cmd
= &ack_receiver_tbl
[pi
.cmd
];
5865 if (pi
.cmd
>= ARRAY_SIZE(ack_receiver_tbl
) || !cmd
->fn
) {
5866 drbd_err(connection
, "Unexpected meta packet %s (0x%04x)\n",
5867 cmdname(pi
.cmd
), pi
.cmd
);
5870 expect
= header_size
+ cmd
->pkt_size
;
5871 if (pi
.size
!= expect
- header_size
) {
5872 drbd_err(connection
, "Wrong packet size on meta (c: %d, l: %d)\n",
5877 if (received
== expect
) {
5880 err
= cmd
->fn(connection
, &pi
);
5882 drbd_err(connection
, "%pf failed\n", cmd
->fn
);
5886 connection
->last_received
= jiffies
;
5888 if (cmd
== &ack_receiver_tbl
[P_PING_ACK
]) {
5889 set_idle_timeout(connection
);
5890 ping_timeout_active
= false;
5893 buf
= connection
->meta
.rbuf
;
5895 expect
= header_size
;
5902 conn_request_state(connection
, NS(conn
, C_NETWORK_FAILURE
), CS_HARD
);
5903 conn_md_sync(connection
);
5907 conn_request_state(connection
, NS(conn
, C_DISCONNECTING
), CS_HARD
);
5910 drbd_info(connection
, "ack_receiver terminated\n");
5915 void drbd_send_acks_wf(struct work_struct
*ws
)
5917 struct drbd_peer_device
*peer_device
=
5918 container_of(ws
, struct drbd_peer_device
, send_acks_work
);
5919 struct drbd_connection
*connection
= peer_device
->connection
;
5920 struct drbd_device
*device
= peer_device
->device
;
5921 struct net_conf
*nc
;
5925 nc
= rcu_dereference(connection
->net_conf
);
5926 tcp_cork
= nc
->tcp_cork
;
5930 drbd_tcp_cork(connection
->meta
.socket
);
5932 err
= drbd_finish_peer_reqs(device
);
5933 kref_put(&device
->kref
, drbd_destroy_device
);
5934 /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
5935 struct work_struct send_acks_work alive, which is in the peer_device object */
5938 conn_request_state(connection
, NS(conn
, C_NETWORK_FAILURE
), CS_HARD
);
5943 drbd_tcp_uncork(connection
->meta
.socket
);