dt-bindings: mtd: ingenic: Use standard ecc-engine property
[linux/fpc-iii.git] / drivers / block / drbd / drbd_receiver.c
blobc7ad88d91a09e7dd427be0792a48ef0f9989178b
1 /*
2 drbd_receiver.c
4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
13 any later version.
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
28 #include <linux/uaccess.h>
29 #include <net/sock.h>
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <uapi/linux/sched/types.h>
40 #include <linux/sched/signal.h>
41 #include <linux/pkt_sched.h>
42 #define __KERNEL_SYSCALLS__
43 #include <linux/unistd.h>
44 #include <linux/vmalloc.h>
45 #include <linux/random.h>
46 #include <linux/string.h>
47 #include <linux/scatterlist.h>
48 #include "drbd_int.h"
49 #include "drbd_protocol.h"
50 #include "drbd_req.h"
51 #include "drbd_vli.h"
53 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME|DRBD_FF_WZEROES)
55 struct packet_info {
56 enum drbd_packet cmd;
57 unsigned int size;
58 unsigned int vnr;
59 void *data;
62 enum finish_epoch {
63 FE_STILL_LIVE,
64 FE_DESTROYED,
65 FE_RECYCLED,
68 static int drbd_do_features(struct drbd_connection *connection);
69 static int drbd_do_auth(struct drbd_connection *connection);
70 static int drbd_disconnected(struct drbd_peer_device *);
71 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
72 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
73 static int e_end_block(struct drbd_work *, int);
76 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
79 * some helper functions to deal with single linked page lists,
80 * page->private being our "next" pointer.
83 /* If at least n pages are linked at head, get n pages off.
84 * Otherwise, don't modify head, and return NULL.
85 * Locking is the responsibility of the caller.
87 static struct page *page_chain_del(struct page **head, int n)
89 struct page *page;
90 struct page *tmp;
92 BUG_ON(!n);
93 BUG_ON(!head);
95 page = *head;
97 if (!page)
98 return NULL;
100 while (page) {
101 tmp = page_chain_next(page);
102 if (--n == 0)
103 break; /* found sufficient pages */
104 if (tmp == NULL)
105 /* insufficient pages, don't use any of them. */
106 return NULL;
107 page = tmp;
110 /* add end of list marker for the returned list */
111 set_page_private(page, 0);
112 /* actual return value, and adjustment of head */
113 page = *head;
114 *head = tmp;
115 return page;
118 /* may be used outside of locks to find the tail of a (usually short)
119 * "private" page chain, before adding it back to a global chain head
120 * with page_chain_add() under a spinlock. */
121 static struct page *page_chain_tail(struct page *page, int *len)
123 struct page *tmp;
124 int i = 1;
125 while ((tmp = page_chain_next(page)))
126 ++i, page = tmp;
127 if (len)
128 *len = i;
129 return page;
132 static int page_chain_free(struct page *page)
134 struct page *tmp;
135 int i = 0;
136 page_chain_for_each_safe(page, tmp) {
137 put_page(page);
138 ++i;
140 return i;
143 static void page_chain_add(struct page **head,
144 struct page *chain_first, struct page *chain_last)
146 #if 1
147 struct page *tmp;
148 tmp = page_chain_tail(chain_first, NULL);
149 BUG_ON(tmp != chain_last);
150 #endif
152 /* add chain to head */
153 set_page_private(chain_last, (unsigned long)*head);
154 *head = chain_first;
157 static struct page *__drbd_alloc_pages(struct drbd_device *device,
158 unsigned int number)
160 struct page *page = NULL;
161 struct page *tmp = NULL;
162 unsigned int i = 0;
164 /* Yes, testing drbd_pp_vacant outside the lock is racy.
165 * So what. It saves a spin_lock. */
166 if (drbd_pp_vacant >= number) {
167 spin_lock(&drbd_pp_lock);
168 page = page_chain_del(&drbd_pp_pool, number);
169 if (page)
170 drbd_pp_vacant -= number;
171 spin_unlock(&drbd_pp_lock);
172 if (page)
173 return page;
176 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
177 * "criss-cross" setup, that might cause write-out on some other DRBD,
178 * which in turn might block on the other node at this very place. */
179 for (i = 0; i < number; i++) {
180 tmp = alloc_page(GFP_TRY);
181 if (!tmp)
182 break;
183 set_page_private(tmp, (unsigned long)page);
184 page = tmp;
187 if (i == number)
188 return page;
190 /* Not enough pages immediately available this time.
191 * No need to jump around here, drbd_alloc_pages will retry this
192 * function "soon". */
193 if (page) {
194 tmp = page_chain_tail(page, NULL);
195 spin_lock(&drbd_pp_lock);
196 page_chain_add(&drbd_pp_pool, page, tmp);
197 drbd_pp_vacant += i;
198 spin_unlock(&drbd_pp_lock);
200 return NULL;
203 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
204 struct list_head *to_be_freed)
206 struct drbd_peer_request *peer_req, *tmp;
208 /* The EEs are always appended to the end of the list. Since
209 they are sent in order over the wire, they have to finish
210 in order. As soon as we see the first not finished we can
211 stop to examine the list... */
213 list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
214 if (drbd_peer_req_has_active_page(peer_req))
215 break;
216 list_move(&peer_req->w.list, to_be_freed);
220 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
222 LIST_HEAD(reclaimed);
223 struct drbd_peer_request *peer_req, *t;
225 spin_lock_irq(&device->resource->req_lock);
226 reclaim_finished_net_peer_reqs(device, &reclaimed);
227 spin_unlock_irq(&device->resource->req_lock);
228 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
229 drbd_free_net_peer_req(device, peer_req);
232 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
234 struct drbd_peer_device *peer_device;
235 int vnr;
237 rcu_read_lock();
238 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
239 struct drbd_device *device = peer_device->device;
240 if (!atomic_read(&device->pp_in_use_by_net))
241 continue;
243 kref_get(&device->kref);
244 rcu_read_unlock();
245 drbd_reclaim_net_peer_reqs(device);
246 kref_put(&device->kref, drbd_destroy_device);
247 rcu_read_lock();
249 rcu_read_unlock();
253 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
254 * @device: DRBD device.
255 * @number: number of pages requested
256 * @retry: whether to retry, if not enough pages are available right now
258 * Tries to allocate number pages, first from our own page pool, then from
259 * the kernel.
260 * Possibly retry until DRBD frees sufficient pages somewhere else.
262 * If this allocation would exceed the max_buffers setting, we throttle
263 * allocation (schedule_timeout) to give the system some room to breathe.
265 * We do not use max-buffers as hard limit, because it could lead to
266 * congestion and further to a distributed deadlock during online-verify or
267 * (checksum based) resync, if the max-buffers, socket buffer sizes and
268 * resync-rate settings are mis-configured.
270 * Returns a page chain linked via page->private.
272 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
273 bool retry)
275 struct drbd_device *device = peer_device->device;
276 struct page *page = NULL;
277 struct net_conf *nc;
278 DEFINE_WAIT(wait);
279 unsigned int mxb;
281 rcu_read_lock();
282 nc = rcu_dereference(peer_device->connection->net_conf);
283 mxb = nc ? nc->max_buffers : 1000000;
284 rcu_read_unlock();
286 if (atomic_read(&device->pp_in_use) < mxb)
287 page = __drbd_alloc_pages(device, number);
289 /* Try to keep the fast path fast, but occasionally we need
290 * to reclaim the pages we lended to the network stack. */
291 if (page && atomic_read(&device->pp_in_use_by_net) > 512)
292 drbd_reclaim_net_peer_reqs(device);
294 while (page == NULL) {
295 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
297 drbd_reclaim_net_peer_reqs(device);
299 if (atomic_read(&device->pp_in_use) < mxb) {
300 page = __drbd_alloc_pages(device, number);
301 if (page)
302 break;
305 if (!retry)
306 break;
308 if (signal_pending(current)) {
309 drbd_warn(device, "drbd_alloc_pages interrupted!\n");
310 break;
313 if (schedule_timeout(HZ/10) == 0)
314 mxb = UINT_MAX;
316 finish_wait(&drbd_pp_wait, &wait);
318 if (page)
319 atomic_add(number, &device->pp_in_use);
320 return page;
323 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
324 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
325 * Either links the page chain back to the global pool,
326 * or returns all pages to the system. */
327 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
329 atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
330 int i;
332 if (page == NULL)
333 return;
335 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * drbd_minor_count)
336 i = page_chain_free(page);
337 else {
338 struct page *tmp;
339 tmp = page_chain_tail(page, &i);
340 spin_lock(&drbd_pp_lock);
341 page_chain_add(&drbd_pp_pool, page, tmp);
342 drbd_pp_vacant += i;
343 spin_unlock(&drbd_pp_lock);
345 i = atomic_sub_return(i, a);
346 if (i < 0)
347 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
348 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
349 wake_up(&drbd_pp_wait);
353 You need to hold the req_lock:
354 _drbd_wait_ee_list_empty()
356 You must not have the req_lock:
357 drbd_free_peer_req()
358 drbd_alloc_peer_req()
359 drbd_free_peer_reqs()
360 drbd_ee_fix_bhs()
361 drbd_finish_peer_reqs()
362 drbd_clear_done_ee()
363 drbd_wait_ee_list_empty()
366 /* normal: payload_size == request size (bi_size)
367 * w_same: payload_size == logical_block_size
368 * trim: payload_size == 0 */
369 struct drbd_peer_request *
370 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
371 unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
373 struct drbd_device *device = peer_device->device;
374 struct drbd_peer_request *peer_req;
375 struct page *page = NULL;
376 unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT;
378 if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
379 return NULL;
381 peer_req = mempool_alloc(&drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
382 if (!peer_req) {
383 if (!(gfp_mask & __GFP_NOWARN))
384 drbd_err(device, "%s: allocation failed\n", __func__);
385 return NULL;
388 if (nr_pages) {
389 page = drbd_alloc_pages(peer_device, nr_pages,
390 gfpflags_allow_blocking(gfp_mask));
391 if (!page)
392 goto fail;
395 memset(peer_req, 0, sizeof(*peer_req));
396 INIT_LIST_HEAD(&peer_req->w.list);
397 drbd_clear_interval(&peer_req->i);
398 peer_req->i.size = request_size;
399 peer_req->i.sector = sector;
400 peer_req->submit_jif = jiffies;
401 peer_req->peer_device = peer_device;
402 peer_req->pages = page;
404 * The block_id is opaque to the receiver. It is not endianness
405 * converted, and sent back to the sender unchanged.
407 peer_req->block_id = id;
409 return peer_req;
411 fail:
412 mempool_free(peer_req, &drbd_ee_mempool);
413 return NULL;
416 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
417 int is_net)
419 might_sleep();
420 if (peer_req->flags & EE_HAS_DIGEST)
421 kfree(peer_req->digest);
422 drbd_free_pages(device, peer_req->pages, is_net);
423 D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
424 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
425 if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
426 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
427 drbd_al_complete_io(device, &peer_req->i);
429 mempool_free(peer_req, &drbd_ee_mempool);
432 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
434 LIST_HEAD(work_list);
435 struct drbd_peer_request *peer_req, *t;
436 int count = 0;
437 int is_net = list == &device->net_ee;
439 spin_lock_irq(&device->resource->req_lock);
440 list_splice_init(list, &work_list);
441 spin_unlock_irq(&device->resource->req_lock);
443 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
444 __drbd_free_peer_req(device, peer_req, is_net);
445 count++;
447 return count;
451 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
453 static int drbd_finish_peer_reqs(struct drbd_device *device)
455 LIST_HEAD(work_list);
456 LIST_HEAD(reclaimed);
457 struct drbd_peer_request *peer_req, *t;
458 int err = 0;
460 spin_lock_irq(&device->resource->req_lock);
461 reclaim_finished_net_peer_reqs(device, &reclaimed);
462 list_splice_init(&device->done_ee, &work_list);
463 spin_unlock_irq(&device->resource->req_lock);
465 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
466 drbd_free_net_peer_req(device, peer_req);
468 /* possible callbacks here:
469 * e_end_block, and e_end_resync_block, e_send_superseded.
470 * all ignore the last argument.
472 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
473 int err2;
475 /* list_del not necessary, next/prev members not touched */
476 err2 = peer_req->w.cb(&peer_req->w, !!err);
477 if (!err)
478 err = err2;
479 drbd_free_peer_req(device, peer_req);
481 wake_up(&device->ee_wait);
483 return err;
486 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
487 struct list_head *head)
489 DEFINE_WAIT(wait);
491 /* avoids spin_lock/unlock
492 * and calling prepare_to_wait in the fast path */
493 while (!list_empty(head)) {
494 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
495 spin_unlock_irq(&device->resource->req_lock);
496 io_schedule();
497 finish_wait(&device->ee_wait, &wait);
498 spin_lock_irq(&device->resource->req_lock);
502 static void drbd_wait_ee_list_empty(struct drbd_device *device,
503 struct list_head *head)
505 spin_lock_irq(&device->resource->req_lock);
506 _drbd_wait_ee_list_empty(device, head);
507 spin_unlock_irq(&device->resource->req_lock);
510 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
512 struct kvec iov = {
513 .iov_base = buf,
514 .iov_len = size,
516 struct msghdr msg = {
517 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
519 iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, size);
520 return sock_recvmsg(sock, &msg, msg.msg_flags);
523 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
525 int rv;
527 rv = drbd_recv_short(connection->data.socket, buf, size, 0);
529 if (rv < 0) {
530 if (rv == -ECONNRESET)
531 drbd_info(connection, "sock was reset by peer\n");
532 else if (rv != -ERESTARTSYS)
533 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
534 } else if (rv == 0) {
535 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
536 long t;
537 rcu_read_lock();
538 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
539 rcu_read_unlock();
541 t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
543 if (t)
544 goto out;
546 drbd_info(connection, "sock was shut down by peer\n");
549 if (rv != size)
550 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
552 out:
553 return rv;
556 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
558 int err;
560 err = drbd_recv(connection, buf, size);
561 if (err != size) {
562 if (err >= 0)
563 err = -EIO;
564 } else
565 err = 0;
566 return err;
569 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
571 int err;
573 err = drbd_recv_all(connection, buf, size);
574 if (err && !signal_pending(current))
575 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
576 return err;
579 /* quoting tcp(7):
580 * On individual connections, the socket buffer size must be set prior to the
581 * listen(2) or connect(2) calls in order to have it take effect.
582 * This is our wrapper to do so.
584 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
585 unsigned int rcv)
587 /* open coded SO_SNDBUF, SO_RCVBUF */
588 if (snd) {
589 sock->sk->sk_sndbuf = snd;
590 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
592 if (rcv) {
593 sock->sk->sk_rcvbuf = rcv;
594 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
598 static struct socket *drbd_try_connect(struct drbd_connection *connection)
600 const char *what;
601 struct socket *sock;
602 struct sockaddr_in6 src_in6;
603 struct sockaddr_in6 peer_in6;
604 struct net_conf *nc;
605 int err, peer_addr_len, my_addr_len;
606 int sndbuf_size, rcvbuf_size, connect_int;
607 int disconnect_on_error = 1;
609 rcu_read_lock();
610 nc = rcu_dereference(connection->net_conf);
611 if (!nc) {
612 rcu_read_unlock();
613 return NULL;
615 sndbuf_size = nc->sndbuf_size;
616 rcvbuf_size = nc->rcvbuf_size;
617 connect_int = nc->connect_int;
618 rcu_read_unlock();
620 my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
621 memcpy(&src_in6, &connection->my_addr, my_addr_len);
623 if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
624 src_in6.sin6_port = 0;
625 else
626 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
628 peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
629 memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
631 what = "sock_create_kern";
632 err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
633 SOCK_STREAM, IPPROTO_TCP, &sock);
634 if (err < 0) {
635 sock = NULL;
636 goto out;
639 sock->sk->sk_rcvtimeo =
640 sock->sk->sk_sndtimeo = connect_int * HZ;
641 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
643 /* explicitly bind to the configured IP as source IP
644 * for the outgoing connections.
645 * This is needed for multihomed hosts and to be
646 * able to use lo: interfaces for drbd.
647 * Make sure to use 0 as port number, so linux selects
648 * a free one dynamically.
650 what = "bind before connect";
651 err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
652 if (err < 0)
653 goto out;
655 /* connect may fail, peer not yet available.
656 * stay C_WF_CONNECTION, don't go Disconnecting! */
657 disconnect_on_error = 0;
658 what = "connect";
659 err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
661 out:
662 if (err < 0) {
663 if (sock) {
664 sock_release(sock);
665 sock = NULL;
667 switch (-err) {
668 /* timeout, busy, signal pending */
669 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
670 case EINTR: case ERESTARTSYS:
671 /* peer not (yet) available, network problem */
672 case ECONNREFUSED: case ENETUNREACH:
673 case EHOSTDOWN: case EHOSTUNREACH:
674 disconnect_on_error = 0;
675 break;
676 default:
677 drbd_err(connection, "%s failed, err = %d\n", what, err);
679 if (disconnect_on_error)
680 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
683 return sock;
686 struct accept_wait_data {
687 struct drbd_connection *connection;
688 struct socket *s_listen;
689 struct completion door_bell;
690 void (*original_sk_state_change)(struct sock *sk);
694 static void drbd_incoming_connection(struct sock *sk)
696 struct accept_wait_data *ad = sk->sk_user_data;
697 void (*state_change)(struct sock *sk);
699 state_change = ad->original_sk_state_change;
700 if (sk->sk_state == TCP_ESTABLISHED)
701 complete(&ad->door_bell);
702 state_change(sk);
705 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
707 int err, sndbuf_size, rcvbuf_size, my_addr_len;
708 struct sockaddr_in6 my_addr;
709 struct socket *s_listen;
710 struct net_conf *nc;
711 const char *what;
713 rcu_read_lock();
714 nc = rcu_dereference(connection->net_conf);
715 if (!nc) {
716 rcu_read_unlock();
717 return -EIO;
719 sndbuf_size = nc->sndbuf_size;
720 rcvbuf_size = nc->rcvbuf_size;
721 rcu_read_unlock();
723 my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
724 memcpy(&my_addr, &connection->my_addr, my_addr_len);
726 what = "sock_create_kern";
727 err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
728 SOCK_STREAM, IPPROTO_TCP, &s_listen);
729 if (err) {
730 s_listen = NULL;
731 goto out;
734 s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
735 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
737 what = "bind before listen";
738 err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
739 if (err < 0)
740 goto out;
742 ad->s_listen = s_listen;
743 write_lock_bh(&s_listen->sk->sk_callback_lock);
744 ad->original_sk_state_change = s_listen->sk->sk_state_change;
745 s_listen->sk->sk_state_change = drbd_incoming_connection;
746 s_listen->sk->sk_user_data = ad;
747 write_unlock_bh(&s_listen->sk->sk_callback_lock);
749 what = "listen";
750 err = s_listen->ops->listen(s_listen, 5);
751 if (err < 0)
752 goto out;
754 return 0;
755 out:
756 if (s_listen)
757 sock_release(s_listen);
758 if (err < 0) {
759 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
760 drbd_err(connection, "%s failed, err = %d\n", what, err);
761 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
765 return -EIO;
768 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
770 write_lock_bh(&sk->sk_callback_lock);
771 sk->sk_state_change = ad->original_sk_state_change;
772 sk->sk_user_data = NULL;
773 write_unlock_bh(&sk->sk_callback_lock);
776 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
778 int timeo, connect_int, err = 0;
779 struct socket *s_estab = NULL;
780 struct net_conf *nc;
782 rcu_read_lock();
783 nc = rcu_dereference(connection->net_conf);
784 if (!nc) {
785 rcu_read_unlock();
786 return NULL;
788 connect_int = nc->connect_int;
789 rcu_read_unlock();
791 timeo = connect_int * HZ;
792 /* 28.5% random jitter */
793 timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
795 err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
796 if (err <= 0)
797 return NULL;
799 err = kernel_accept(ad->s_listen, &s_estab, 0);
800 if (err < 0) {
801 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
802 drbd_err(connection, "accept failed, err = %d\n", err);
803 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
807 if (s_estab)
808 unregister_state_change(s_estab->sk, ad);
810 return s_estab;
813 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
815 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
816 enum drbd_packet cmd)
818 if (!conn_prepare_command(connection, sock))
819 return -EIO;
820 return conn_send_command(connection, sock, cmd, 0, NULL, 0);
823 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
825 unsigned int header_size = drbd_header_size(connection);
826 struct packet_info pi;
827 struct net_conf *nc;
828 int err;
830 rcu_read_lock();
831 nc = rcu_dereference(connection->net_conf);
832 if (!nc) {
833 rcu_read_unlock();
834 return -EIO;
836 sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
837 rcu_read_unlock();
839 err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
840 if (err != header_size) {
841 if (err >= 0)
842 err = -EIO;
843 return err;
845 err = decode_header(connection, connection->data.rbuf, &pi);
846 if (err)
847 return err;
848 return pi.cmd;
852 * drbd_socket_okay() - Free the socket if its connection is not okay
853 * @sock: pointer to the pointer to the socket.
855 static bool drbd_socket_okay(struct socket **sock)
857 int rr;
858 char tb[4];
860 if (!*sock)
861 return false;
863 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
865 if (rr > 0 || rr == -EAGAIN) {
866 return true;
867 } else {
868 sock_release(*sock);
869 *sock = NULL;
870 return false;
874 static bool connection_established(struct drbd_connection *connection,
875 struct socket **sock1,
876 struct socket **sock2)
878 struct net_conf *nc;
879 int timeout;
880 bool ok;
882 if (!*sock1 || !*sock2)
883 return false;
885 rcu_read_lock();
886 nc = rcu_dereference(connection->net_conf);
887 timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
888 rcu_read_unlock();
889 schedule_timeout_interruptible(timeout);
891 ok = drbd_socket_okay(sock1);
892 ok = drbd_socket_okay(sock2) && ok;
894 return ok;
897 /* Gets called if a connection is established, or if a new minor gets created
898 in a connection */
899 int drbd_connected(struct drbd_peer_device *peer_device)
901 struct drbd_device *device = peer_device->device;
902 int err;
904 atomic_set(&device->packet_seq, 0);
905 device->peer_seq = 0;
907 device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
908 &peer_device->connection->cstate_mutex :
909 &device->own_state_mutex;
911 err = drbd_send_sync_param(peer_device);
912 if (!err)
913 err = drbd_send_sizes(peer_device, 0, 0);
914 if (!err)
915 err = drbd_send_uuids(peer_device);
916 if (!err)
917 err = drbd_send_current_state(peer_device);
918 clear_bit(USE_DEGR_WFC_T, &device->flags);
919 clear_bit(RESIZE_PENDING, &device->flags);
920 atomic_set(&device->ap_in_flight, 0);
921 mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
922 return err;
926 * return values:
927 * 1 yes, we have a valid connection
928 * 0 oops, did not work out, please try again
929 * -1 peer talks different language,
930 * no point in trying again, please go standalone.
931 * -2 We do not have a network config...
933 static int conn_connect(struct drbd_connection *connection)
935 struct drbd_socket sock, msock;
936 struct drbd_peer_device *peer_device;
937 struct net_conf *nc;
938 int vnr, timeout, h;
939 bool discard_my_data, ok;
940 enum drbd_state_rv rv;
941 struct accept_wait_data ad = {
942 .connection = connection,
943 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
946 clear_bit(DISCONNECT_SENT, &connection->flags);
947 if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
948 return -2;
950 mutex_init(&sock.mutex);
951 sock.sbuf = connection->data.sbuf;
952 sock.rbuf = connection->data.rbuf;
953 sock.socket = NULL;
954 mutex_init(&msock.mutex);
955 msock.sbuf = connection->meta.sbuf;
956 msock.rbuf = connection->meta.rbuf;
957 msock.socket = NULL;
959 /* Assume that the peer only understands protocol 80 until we know better. */
960 connection->agreed_pro_version = 80;
962 if (prepare_listen_socket(connection, &ad))
963 return 0;
965 do {
966 struct socket *s;
968 s = drbd_try_connect(connection);
969 if (s) {
970 if (!sock.socket) {
971 sock.socket = s;
972 send_first_packet(connection, &sock, P_INITIAL_DATA);
973 } else if (!msock.socket) {
974 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
975 msock.socket = s;
976 send_first_packet(connection, &msock, P_INITIAL_META);
977 } else {
978 drbd_err(connection, "Logic error in conn_connect()\n");
979 goto out_release_sockets;
983 if (connection_established(connection, &sock.socket, &msock.socket))
984 break;
986 retry:
987 s = drbd_wait_for_connect(connection, &ad);
988 if (s) {
989 int fp = receive_first_packet(connection, s);
990 drbd_socket_okay(&sock.socket);
991 drbd_socket_okay(&msock.socket);
992 switch (fp) {
993 case P_INITIAL_DATA:
994 if (sock.socket) {
995 drbd_warn(connection, "initial packet S crossed\n");
996 sock_release(sock.socket);
997 sock.socket = s;
998 goto randomize;
1000 sock.socket = s;
1001 break;
1002 case P_INITIAL_META:
1003 set_bit(RESOLVE_CONFLICTS, &connection->flags);
1004 if (msock.socket) {
1005 drbd_warn(connection, "initial packet M crossed\n");
1006 sock_release(msock.socket);
1007 msock.socket = s;
1008 goto randomize;
1010 msock.socket = s;
1011 break;
1012 default:
1013 drbd_warn(connection, "Error receiving initial packet\n");
1014 sock_release(s);
1015 randomize:
1016 if (prandom_u32() & 1)
1017 goto retry;
1021 if (connection->cstate <= C_DISCONNECTING)
1022 goto out_release_sockets;
1023 if (signal_pending(current)) {
1024 flush_signals(current);
1025 smp_rmb();
1026 if (get_t_state(&connection->receiver) == EXITING)
1027 goto out_release_sockets;
1030 ok = connection_established(connection, &sock.socket, &msock.socket);
1031 } while (!ok);
1033 if (ad.s_listen)
1034 sock_release(ad.s_listen);
1036 sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1037 msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1039 sock.socket->sk->sk_allocation = GFP_NOIO;
1040 msock.socket->sk->sk_allocation = GFP_NOIO;
1042 sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1043 msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1045 /* NOT YET ...
1046 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1047 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1048 * first set it to the P_CONNECTION_FEATURES timeout,
1049 * which we set to 4x the configured ping_timeout. */
1050 rcu_read_lock();
1051 nc = rcu_dereference(connection->net_conf);
1053 sock.socket->sk->sk_sndtimeo =
1054 sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1056 msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1057 timeout = nc->timeout * HZ / 10;
1058 discard_my_data = nc->discard_my_data;
1059 rcu_read_unlock();
1061 msock.socket->sk->sk_sndtimeo = timeout;
1063 /* we don't want delays.
1064 * we use TCP_CORK where appropriate, though */
1065 drbd_tcp_nodelay(sock.socket);
1066 drbd_tcp_nodelay(msock.socket);
1068 connection->data.socket = sock.socket;
1069 connection->meta.socket = msock.socket;
1070 connection->last_received = jiffies;
1072 h = drbd_do_features(connection);
1073 if (h <= 0)
1074 return h;
1076 if (connection->cram_hmac_tfm) {
1077 /* drbd_request_state(device, NS(conn, WFAuth)); */
1078 switch (drbd_do_auth(connection)) {
1079 case -1:
1080 drbd_err(connection, "Authentication of peer failed\n");
1081 return -1;
1082 case 0:
1083 drbd_err(connection, "Authentication of peer failed, trying again.\n");
1084 return 0;
1088 connection->data.socket->sk->sk_sndtimeo = timeout;
1089 connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1091 if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1092 return -1;
1094 /* Prevent a race between resync-handshake and
1095 * being promoted to Primary.
1097 * Grab and release the state mutex, so we know that any current
1098 * drbd_set_role() is finished, and any incoming drbd_set_role
1099 * will see the STATE_SENT flag, and wait for it to be cleared.
1101 idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1102 mutex_lock(peer_device->device->state_mutex);
1104 /* avoid a race with conn_request_state( C_DISCONNECTING ) */
1105 spin_lock_irq(&connection->resource->req_lock);
1106 set_bit(STATE_SENT, &connection->flags);
1107 spin_unlock_irq(&connection->resource->req_lock);
1109 idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1110 mutex_unlock(peer_device->device->state_mutex);
1112 rcu_read_lock();
1113 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1114 struct drbd_device *device = peer_device->device;
1115 kref_get(&device->kref);
1116 rcu_read_unlock();
1118 if (discard_my_data)
1119 set_bit(DISCARD_MY_DATA, &device->flags);
1120 else
1121 clear_bit(DISCARD_MY_DATA, &device->flags);
1123 drbd_connected(peer_device);
1124 kref_put(&device->kref, drbd_destroy_device);
1125 rcu_read_lock();
1127 rcu_read_unlock();
1129 rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1130 if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1131 clear_bit(STATE_SENT, &connection->flags);
1132 return 0;
1135 drbd_thread_start(&connection->ack_receiver);
1136 /* opencoded create_singlethread_workqueue(),
1137 * to be able to use format string arguments */
1138 connection->ack_sender =
1139 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1140 if (!connection->ack_sender) {
1141 drbd_err(connection, "Failed to create workqueue ack_sender\n");
1142 return 0;
1145 mutex_lock(&connection->resource->conf_update);
1146 /* The discard_my_data flag is a single-shot modifier to the next
1147 * connection attempt, the handshake of which is now well underway.
1148 * No need for rcu style copying of the whole struct
1149 * just to clear a single value. */
1150 connection->net_conf->discard_my_data = 0;
1151 mutex_unlock(&connection->resource->conf_update);
1153 return h;
1155 out_release_sockets:
1156 if (ad.s_listen)
1157 sock_release(ad.s_listen);
1158 if (sock.socket)
1159 sock_release(sock.socket);
1160 if (msock.socket)
1161 sock_release(msock.socket);
1162 return -1;
1165 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1167 unsigned int header_size = drbd_header_size(connection);
1169 if (header_size == sizeof(struct p_header100) &&
1170 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1171 struct p_header100 *h = header;
1172 if (h->pad != 0) {
1173 drbd_err(connection, "Header padding is not zero\n");
1174 return -EINVAL;
1176 pi->vnr = be16_to_cpu(h->volume);
1177 pi->cmd = be16_to_cpu(h->command);
1178 pi->size = be32_to_cpu(h->length);
1179 } else if (header_size == sizeof(struct p_header95) &&
1180 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1181 struct p_header95 *h = header;
1182 pi->cmd = be16_to_cpu(h->command);
1183 pi->size = be32_to_cpu(h->length);
1184 pi->vnr = 0;
1185 } else if (header_size == sizeof(struct p_header80) &&
1186 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1187 struct p_header80 *h = header;
1188 pi->cmd = be16_to_cpu(h->command);
1189 pi->size = be16_to_cpu(h->length);
1190 pi->vnr = 0;
1191 } else {
1192 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1193 be32_to_cpu(*(__be32 *)header),
1194 connection->agreed_pro_version);
1195 return -EINVAL;
1197 pi->data = header + header_size;
1198 return 0;
1201 static void drbd_unplug_all_devices(struct drbd_connection *connection)
1203 if (current->plug == &connection->receiver_plug) {
1204 blk_finish_plug(&connection->receiver_plug);
1205 blk_start_plug(&connection->receiver_plug);
1206 } /* else: maybe just schedule() ?? */
1209 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1211 void *buffer = connection->data.rbuf;
1212 int err;
1214 err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1215 if (err)
1216 return err;
1218 err = decode_header(connection, buffer, pi);
1219 connection->last_received = jiffies;
1221 return err;
1224 static int drbd_recv_header_maybe_unplug(struct drbd_connection *connection, struct packet_info *pi)
1226 void *buffer = connection->data.rbuf;
1227 unsigned int size = drbd_header_size(connection);
1228 int err;
1230 err = drbd_recv_short(connection->data.socket, buffer, size, MSG_NOSIGNAL|MSG_DONTWAIT);
1231 if (err != size) {
1232 /* If we have nothing in the receive buffer now, to reduce
1233 * application latency, try to drain the backend queues as
1234 * quickly as possible, and let remote TCP know what we have
1235 * received so far. */
1236 if (err == -EAGAIN) {
1237 drbd_tcp_quickack(connection->data.socket);
1238 drbd_unplug_all_devices(connection);
1240 if (err > 0) {
1241 buffer += err;
1242 size -= err;
1244 err = drbd_recv_all_warn(connection, buffer, size);
1245 if (err)
1246 return err;
1249 err = decode_header(connection, connection->data.rbuf, pi);
1250 connection->last_received = jiffies;
1252 return err;
1254 /* This is blkdev_issue_flush, but asynchronous.
1255 * We want to submit to all component volumes in parallel,
1256 * then wait for all completions.
1258 struct issue_flush_context {
1259 atomic_t pending;
1260 int error;
1261 struct completion done;
1263 struct one_flush_context {
1264 struct drbd_device *device;
1265 struct issue_flush_context *ctx;
1268 static void one_flush_endio(struct bio *bio)
1270 struct one_flush_context *octx = bio->bi_private;
1271 struct drbd_device *device = octx->device;
1272 struct issue_flush_context *ctx = octx->ctx;
1274 if (bio->bi_status) {
1275 ctx->error = blk_status_to_errno(bio->bi_status);
1276 drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
1278 kfree(octx);
1279 bio_put(bio);
1281 clear_bit(FLUSH_PENDING, &device->flags);
1282 put_ldev(device);
1283 kref_put(&device->kref, drbd_destroy_device);
1285 if (atomic_dec_and_test(&ctx->pending))
1286 complete(&ctx->done);
1289 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1291 struct bio *bio = bio_alloc(GFP_NOIO, 0);
1292 struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1293 if (!bio || !octx) {
1294 drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1295 /* FIXME: what else can I do now? disconnecting or detaching
1296 * really does not help to improve the state of the world, either.
1298 kfree(octx);
1299 if (bio)
1300 bio_put(bio);
1302 ctx->error = -ENOMEM;
1303 put_ldev(device);
1304 kref_put(&device->kref, drbd_destroy_device);
1305 return;
1308 octx->device = device;
1309 octx->ctx = ctx;
1310 bio_set_dev(bio, device->ldev->backing_bdev);
1311 bio->bi_private = octx;
1312 bio->bi_end_io = one_flush_endio;
1313 bio->bi_opf = REQ_OP_FLUSH | REQ_PREFLUSH;
1315 device->flush_jif = jiffies;
1316 set_bit(FLUSH_PENDING, &device->flags);
1317 atomic_inc(&ctx->pending);
1318 submit_bio(bio);
1321 static void drbd_flush(struct drbd_connection *connection)
1323 if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1324 struct drbd_peer_device *peer_device;
1325 struct issue_flush_context ctx;
1326 int vnr;
1328 atomic_set(&ctx.pending, 1);
1329 ctx.error = 0;
1330 init_completion(&ctx.done);
1332 rcu_read_lock();
1333 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1334 struct drbd_device *device = peer_device->device;
1336 if (!get_ldev(device))
1337 continue;
1338 kref_get(&device->kref);
1339 rcu_read_unlock();
1341 submit_one_flush(device, &ctx);
1343 rcu_read_lock();
1345 rcu_read_unlock();
1347 /* Do we want to add a timeout,
1348 * if disk-timeout is set? */
1349 if (!atomic_dec_and_test(&ctx.pending))
1350 wait_for_completion(&ctx.done);
1352 if (ctx.error) {
1353 /* would rather check on EOPNOTSUPP, but that is not reliable.
1354 * don't try again for ANY return value != 0
1355 * if (rv == -EOPNOTSUPP) */
1356 /* Any error is already reported by bio_endio callback. */
1357 drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1363 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1364 * @device: DRBD device.
1365 * @epoch: Epoch object.
1366 * @ev: Epoch event.
1368 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1369 struct drbd_epoch *epoch,
1370 enum epoch_event ev)
1372 int epoch_size;
1373 struct drbd_epoch *next_epoch;
1374 enum finish_epoch rv = FE_STILL_LIVE;
1376 spin_lock(&connection->epoch_lock);
1377 do {
1378 next_epoch = NULL;
1380 epoch_size = atomic_read(&epoch->epoch_size);
1382 switch (ev & ~EV_CLEANUP) {
1383 case EV_PUT:
1384 atomic_dec(&epoch->active);
1385 break;
1386 case EV_GOT_BARRIER_NR:
1387 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1388 break;
1389 case EV_BECAME_LAST:
1390 /* nothing to do*/
1391 break;
1394 if (epoch_size != 0 &&
1395 atomic_read(&epoch->active) == 0 &&
1396 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1397 if (!(ev & EV_CLEANUP)) {
1398 spin_unlock(&connection->epoch_lock);
1399 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1400 spin_lock(&connection->epoch_lock);
1402 #if 0
1403 /* FIXME: dec unacked on connection, once we have
1404 * something to count pending connection packets in. */
1405 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1406 dec_unacked(epoch->connection);
1407 #endif
1409 if (connection->current_epoch != epoch) {
1410 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1411 list_del(&epoch->list);
1412 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1413 connection->epochs--;
1414 kfree(epoch);
1416 if (rv == FE_STILL_LIVE)
1417 rv = FE_DESTROYED;
1418 } else {
1419 epoch->flags = 0;
1420 atomic_set(&epoch->epoch_size, 0);
1421 /* atomic_set(&epoch->active, 0); is already zero */
1422 if (rv == FE_STILL_LIVE)
1423 rv = FE_RECYCLED;
1427 if (!next_epoch)
1428 break;
1430 epoch = next_epoch;
1431 } while (1);
1433 spin_unlock(&connection->epoch_lock);
1435 return rv;
1438 static enum write_ordering_e
1439 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1441 struct disk_conf *dc;
1443 dc = rcu_dereference(bdev->disk_conf);
1445 if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1446 wo = WO_DRAIN_IO;
1447 if (wo == WO_DRAIN_IO && !dc->disk_drain)
1448 wo = WO_NONE;
1450 return wo;
1454 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1455 * @connection: DRBD connection.
1456 * @wo: Write ordering method to try.
1458 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1459 enum write_ordering_e wo)
1461 struct drbd_device *device;
1462 enum write_ordering_e pwo;
1463 int vnr;
1464 static char *write_ordering_str[] = {
1465 [WO_NONE] = "none",
1466 [WO_DRAIN_IO] = "drain",
1467 [WO_BDEV_FLUSH] = "flush",
1470 pwo = resource->write_ordering;
1471 if (wo != WO_BDEV_FLUSH)
1472 wo = min(pwo, wo);
1473 rcu_read_lock();
1474 idr_for_each_entry(&resource->devices, device, vnr) {
1475 if (get_ldev(device)) {
1476 wo = max_allowed_wo(device->ldev, wo);
1477 if (device->ldev == bdev)
1478 bdev = NULL;
1479 put_ldev(device);
1483 if (bdev)
1484 wo = max_allowed_wo(bdev, wo);
1486 rcu_read_unlock();
1488 resource->write_ordering = wo;
1489 if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1490 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1494 * Mapping "discard" to ZEROOUT with UNMAP does not work for us:
1495 * Drivers have to "announce" q->limits.max_write_zeroes_sectors, or it
1496 * will directly go to fallback mode, submitting normal writes, and
1497 * never even try to UNMAP.
1499 * And dm-thin does not do this (yet), mostly because in general it has
1500 * to assume that "skip_block_zeroing" is set. See also:
1501 * https://www.mail-archive.com/dm-devel%40redhat.com/msg07965.html
1502 * https://www.redhat.com/archives/dm-devel/2018-January/msg00271.html
1504 * We *may* ignore the discard-zeroes-data setting, if so configured.
1506 * Assumption is that this "discard_zeroes_data=0" is only because the backend
1507 * may ignore partial unaligned discards.
1509 * LVM/DM thin as of at least
1510 * LVM version: 2.02.115(2)-RHEL7 (2015-01-28)
1511 * Library version: 1.02.93-RHEL7 (2015-01-28)
1512 * Driver version: 4.29.0
1513 * still behaves this way.
1515 * For unaligned (wrt. alignment and granularity) or too small discards,
1516 * we zero-out the initial (and/or) trailing unaligned partial chunks,
1517 * but discard all the aligned full chunks.
1519 * At least for LVM/DM thin, with skip_block_zeroing=false,
1520 * the result is effectively "discard_zeroes_data=1".
1522 /* flags: EE_TRIM|EE_ZEROOUT */
1523 int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, int flags)
1525 struct block_device *bdev = device->ldev->backing_bdev;
1526 struct request_queue *q = bdev_get_queue(bdev);
1527 sector_t tmp, nr;
1528 unsigned int max_discard_sectors, granularity;
1529 int alignment;
1530 int err = 0;
1532 if ((flags & EE_ZEROOUT) || !(flags & EE_TRIM))
1533 goto zero_out;
1535 /* Zero-sector (unknown) and one-sector granularities are the same. */
1536 granularity = max(q->limits.discard_granularity >> 9, 1U);
1537 alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
1539 max_discard_sectors = min(q->limits.max_discard_sectors, (1U << 22));
1540 max_discard_sectors -= max_discard_sectors % granularity;
1541 if (unlikely(!max_discard_sectors))
1542 goto zero_out;
1544 if (nr_sectors < granularity)
1545 goto zero_out;
1547 tmp = start;
1548 if (sector_div(tmp, granularity) != alignment) {
1549 if (nr_sectors < 2*granularity)
1550 goto zero_out;
1551 /* start + gran - (start + gran - align) % gran */
1552 tmp = start + granularity - alignment;
1553 tmp = start + granularity - sector_div(tmp, granularity);
1555 nr = tmp - start;
1556 /* don't flag BLKDEV_ZERO_NOUNMAP, we don't know how many
1557 * layers are below us, some may have smaller granularity */
1558 err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
1559 nr_sectors -= nr;
1560 start = tmp;
1562 while (nr_sectors >= max_discard_sectors) {
1563 err |= blkdev_issue_discard(bdev, start, max_discard_sectors, GFP_NOIO, 0);
1564 nr_sectors -= max_discard_sectors;
1565 start += max_discard_sectors;
1567 if (nr_sectors) {
1568 /* max_discard_sectors is unsigned int (and a multiple of
1569 * granularity, we made sure of that above already);
1570 * nr is < max_discard_sectors;
1571 * I don't need sector_div here, even though nr is sector_t */
1572 nr = nr_sectors;
1573 nr -= (unsigned int)nr % granularity;
1574 if (nr) {
1575 err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO, 0);
1576 nr_sectors -= nr;
1577 start += nr;
1580 zero_out:
1581 if (nr_sectors) {
1582 err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO,
1583 (flags & EE_TRIM) ? 0 : BLKDEV_ZERO_NOUNMAP);
1585 return err != 0;
1588 static bool can_do_reliable_discards(struct drbd_device *device)
1590 struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
1591 struct disk_conf *dc;
1592 bool can_do;
1594 if (!blk_queue_discard(q))
1595 return false;
1597 rcu_read_lock();
1598 dc = rcu_dereference(device->ldev->disk_conf);
1599 can_do = dc->discard_zeroes_if_aligned;
1600 rcu_read_unlock();
1601 return can_do;
1604 static void drbd_issue_peer_discard_or_zero_out(struct drbd_device *device, struct drbd_peer_request *peer_req)
1606 /* If the backend cannot discard, or does not guarantee
1607 * read-back zeroes in discarded ranges, we fall back to
1608 * zero-out. Unless configuration specifically requested
1609 * otherwise. */
1610 if (!can_do_reliable_discards(device))
1611 peer_req->flags |= EE_ZEROOUT;
1613 if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
1614 peer_req->i.size >> 9, peer_req->flags & (EE_ZEROOUT|EE_TRIM)))
1615 peer_req->flags |= EE_WAS_ERROR;
1616 drbd_endio_write_sec_final(peer_req);
1619 static void drbd_issue_peer_wsame(struct drbd_device *device,
1620 struct drbd_peer_request *peer_req)
1622 struct block_device *bdev = device->ldev->backing_bdev;
1623 sector_t s = peer_req->i.sector;
1624 sector_t nr = peer_req->i.size >> 9;
1625 if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages))
1626 peer_req->flags |= EE_WAS_ERROR;
1627 drbd_endio_write_sec_final(peer_req);
1632 * drbd_submit_peer_request()
1633 * @device: DRBD device.
1634 * @peer_req: peer request
1635 * @rw: flag field, see bio->bi_opf
1637 * May spread the pages to multiple bios,
1638 * depending on bio_add_page restrictions.
1640 * Returns 0 if all bios have been submitted,
1641 * -ENOMEM if we could not allocate enough bios,
1642 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1643 * single page to an empty bio (which should never happen and likely indicates
1644 * that the lower level IO stack is in some way broken). This has been observed
1645 * on certain Xen deployments.
1647 /* TODO allocate from our own bio_set. */
1648 int drbd_submit_peer_request(struct drbd_device *device,
1649 struct drbd_peer_request *peer_req,
1650 const unsigned op, const unsigned op_flags,
1651 const int fault_type)
1653 struct bio *bios = NULL;
1654 struct bio *bio;
1655 struct page *page = peer_req->pages;
1656 sector_t sector = peer_req->i.sector;
1657 unsigned data_size = peer_req->i.size;
1658 unsigned n_bios = 0;
1659 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1660 int err = -ENOMEM;
1662 /* TRIM/DISCARD: for now, always use the helper function
1663 * blkdev_issue_zeroout(..., discard=true).
1664 * It's synchronous, but it does the right thing wrt. bio splitting.
1665 * Correctness first, performance later. Next step is to code an
1666 * asynchronous variant of the same.
1668 if (peer_req->flags & (EE_TRIM|EE_WRITE_SAME|EE_ZEROOUT)) {
1669 /* wait for all pending IO completions, before we start
1670 * zeroing things out. */
1671 conn_wait_active_ee_empty(peer_req->peer_device->connection);
1672 /* add it to the active list now,
1673 * so we can find it to present it in debugfs */
1674 peer_req->submit_jif = jiffies;
1675 peer_req->flags |= EE_SUBMITTED;
1677 /* If this was a resync request from receive_rs_deallocated(),
1678 * it is already on the sync_ee list */
1679 if (list_empty(&peer_req->w.list)) {
1680 spin_lock_irq(&device->resource->req_lock);
1681 list_add_tail(&peer_req->w.list, &device->active_ee);
1682 spin_unlock_irq(&device->resource->req_lock);
1685 if (peer_req->flags & (EE_TRIM|EE_ZEROOUT))
1686 drbd_issue_peer_discard_or_zero_out(device, peer_req);
1687 else /* EE_WRITE_SAME */
1688 drbd_issue_peer_wsame(device, peer_req);
1689 return 0;
1692 /* In most cases, we will only need one bio. But in case the lower
1693 * level restrictions happen to be different at this offset on this
1694 * side than those of the sending peer, we may need to submit the
1695 * request in more than one bio.
1697 * Plain bio_alloc is good enough here, this is no DRBD internally
1698 * generated bio, but a bio allocated on behalf of the peer.
1700 next_bio:
1701 bio = bio_alloc(GFP_NOIO, nr_pages);
1702 if (!bio) {
1703 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1704 goto fail;
1706 /* > peer_req->i.sector, unless this is the first bio */
1707 bio->bi_iter.bi_sector = sector;
1708 bio_set_dev(bio, device->ldev->backing_bdev);
1709 bio_set_op_attrs(bio, op, op_flags);
1710 bio->bi_private = peer_req;
1711 bio->bi_end_io = drbd_peer_request_endio;
1713 bio->bi_next = bios;
1714 bios = bio;
1715 ++n_bios;
1717 page_chain_for_each(page) {
1718 unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1719 if (!bio_add_page(bio, page, len, 0))
1720 goto next_bio;
1721 data_size -= len;
1722 sector += len >> 9;
1723 --nr_pages;
1725 D_ASSERT(device, data_size == 0);
1726 D_ASSERT(device, page == NULL);
1728 atomic_set(&peer_req->pending_bios, n_bios);
1729 /* for debugfs: update timestamp, mark as submitted */
1730 peer_req->submit_jif = jiffies;
1731 peer_req->flags |= EE_SUBMITTED;
1732 do {
1733 bio = bios;
1734 bios = bios->bi_next;
1735 bio->bi_next = NULL;
1737 drbd_generic_make_request(device, fault_type, bio);
1738 } while (bios);
1739 return 0;
1741 fail:
1742 while (bios) {
1743 bio = bios;
1744 bios = bios->bi_next;
1745 bio_put(bio);
1747 return err;
1750 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1751 struct drbd_peer_request *peer_req)
1753 struct drbd_interval *i = &peer_req->i;
1755 drbd_remove_interval(&device->write_requests, i);
1756 drbd_clear_interval(i);
1758 /* Wake up any processes waiting for this peer request to complete. */
1759 if (i->waiting)
1760 wake_up(&device->misc_wait);
1763 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1765 struct drbd_peer_device *peer_device;
1766 int vnr;
1768 rcu_read_lock();
1769 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1770 struct drbd_device *device = peer_device->device;
1772 kref_get(&device->kref);
1773 rcu_read_unlock();
1774 drbd_wait_ee_list_empty(device, &device->active_ee);
1775 kref_put(&device->kref, drbd_destroy_device);
1776 rcu_read_lock();
1778 rcu_read_unlock();
1781 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1783 int rv;
1784 struct p_barrier *p = pi->data;
1785 struct drbd_epoch *epoch;
1787 /* FIXME these are unacked on connection,
1788 * not a specific (peer)device.
1790 connection->current_epoch->barrier_nr = p->barrier;
1791 connection->current_epoch->connection = connection;
1792 rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1794 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1795 * the activity log, which means it would not be resynced in case the
1796 * R_PRIMARY crashes now.
1797 * Therefore we must send the barrier_ack after the barrier request was
1798 * completed. */
1799 switch (connection->resource->write_ordering) {
1800 case WO_NONE:
1801 if (rv == FE_RECYCLED)
1802 return 0;
1804 /* receiver context, in the writeout path of the other node.
1805 * avoid potential distributed deadlock */
1806 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1807 if (epoch)
1808 break;
1809 else
1810 drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1811 /* Fall through */
1813 case WO_BDEV_FLUSH:
1814 case WO_DRAIN_IO:
1815 conn_wait_active_ee_empty(connection);
1816 drbd_flush(connection);
1818 if (atomic_read(&connection->current_epoch->epoch_size)) {
1819 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1820 if (epoch)
1821 break;
1824 return 0;
1825 default:
1826 drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1827 connection->resource->write_ordering);
1828 return -EIO;
1831 epoch->flags = 0;
1832 atomic_set(&epoch->epoch_size, 0);
1833 atomic_set(&epoch->active, 0);
1835 spin_lock(&connection->epoch_lock);
1836 if (atomic_read(&connection->current_epoch->epoch_size)) {
1837 list_add(&epoch->list, &connection->current_epoch->list);
1838 connection->current_epoch = epoch;
1839 connection->epochs++;
1840 } else {
1841 /* The current_epoch got recycled while we allocated this one... */
1842 kfree(epoch);
1844 spin_unlock(&connection->epoch_lock);
1846 return 0;
1849 /* quick wrapper in case payload size != request_size (write same) */
1850 static void drbd_csum_ee_size(struct crypto_shash *h,
1851 struct drbd_peer_request *r, void *d,
1852 unsigned int payload_size)
1854 unsigned int tmp = r->i.size;
1855 r->i.size = payload_size;
1856 drbd_csum_ee(h, r, d);
1857 r->i.size = tmp;
1860 /* used from receive_RSDataReply (recv_resync_read)
1861 * and from receive_Data.
1862 * data_size: actual payload ("data in")
1863 * for normal writes that is bi_size.
1864 * for discards, that is zero.
1865 * for write same, it is logical_block_size.
1866 * both trim and write same have the bi_size ("data len to be affected")
1867 * as extra argument in the packet header.
1869 static struct drbd_peer_request *
1870 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1871 struct packet_info *pi) __must_hold(local)
1873 struct drbd_device *device = peer_device->device;
1874 const sector_t capacity = drbd_get_capacity(device->this_bdev);
1875 struct drbd_peer_request *peer_req;
1876 struct page *page;
1877 int digest_size, err;
1878 unsigned int data_size = pi->size, ds;
1879 void *dig_in = peer_device->connection->int_dig_in;
1880 void *dig_vv = peer_device->connection->int_dig_vv;
1881 unsigned long *data;
1882 struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1883 struct p_trim *zeroes = (pi->cmd == P_ZEROES) ? pi->data : NULL;
1884 struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL;
1886 digest_size = 0;
1887 if (!trim && peer_device->connection->peer_integrity_tfm) {
1888 digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
1890 * FIXME: Receive the incoming digest into the receive buffer
1891 * here, together with its struct p_data?
1893 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1894 if (err)
1895 return NULL;
1896 data_size -= digest_size;
1899 /* assume request_size == data_size, but special case trim and wsame. */
1900 ds = data_size;
1901 if (trim) {
1902 if (!expect(data_size == 0))
1903 return NULL;
1904 ds = be32_to_cpu(trim->size);
1905 } else if (zeroes) {
1906 if (!expect(data_size == 0))
1907 return NULL;
1908 ds = be32_to_cpu(zeroes->size);
1909 } else if (wsame) {
1910 if (data_size != queue_logical_block_size(device->rq_queue)) {
1911 drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n",
1912 data_size, queue_logical_block_size(device->rq_queue));
1913 return NULL;
1915 if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) {
1916 drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n",
1917 data_size, bdev_logical_block_size(device->ldev->backing_bdev));
1918 return NULL;
1920 ds = be32_to_cpu(wsame->size);
1923 if (!expect(IS_ALIGNED(ds, 512)))
1924 return NULL;
1925 if (trim || wsame || zeroes) {
1926 if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1927 return NULL;
1928 } else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1929 return NULL;
1931 /* even though we trust out peer,
1932 * we sometimes have to double check. */
1933 if (sector + (ds>>9) > capacity) {
1934 drbd_err(device, "request from peer beyond end of local disk: "
1935 "capacity: %llus < sector: %llus + size: %u\n",
1936 (unsigned long long)capacity,
1937 (unsigned long long)sector, ds);
1938 return NULL;
1941 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1942 * "criss-cross" setup, that might cause write-out on some other DRBD,
1943 * which in turn might block on the other node at this very place. */
1944 peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1945 if (!peer_req)
1946 return NULL;
1948 peer_req->flags |= EE_WRITE;
1949 if (trim) {
1950 peer_req->flags |= EE_TRIM;
1951 return peer_req;
1953 if (zeroes) {
1954 peer_req->flags |= EE_ZEROOUT;
1955 return peer_req;
1957 if (wsame)
1958 peer_req->flags |= EE_WRITE_SAME;
1960 /* receive payload size bytes into page chain */
1961 ds = data_size;
1962 page = peer_req->pages;
1963 page_chain_for_each(page) {
1964 unsigned len = min_t(int, ds, PAGE_SIZE);
1965 data = kmap(page);
1966 err = drbd_recv_all_warn(peer_device->connection, data, len);
1967 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1968 drbd_err(device, "Fault injection: Corrupting data on receive\n");
1969 data[0] = data[0] ^ (unsigned long)-1;
1971 kunmap(page);
1972 if (err) {
1973 drbd_free_peer_req(device, peer_req);
1974 return NULL;
1976 ds -= len;
1979 if (digest_size) {
1980 drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1981 if (memcmp(dig_in, dig_vv, digest_size)) {
1982 drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1983 (unsigned long long)sector, data_size);
1984 drbd_free_peer_req(device, peer_req);
1985 return NULL;
1988 device->recv_cnt += data_size >> 9;
1989 return peer_req;
1992 /* drbd_drain_block() just takes a data block
1993 * out of the socket input buffer, and discards it.
1995 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1997 struct page *page;
1998 int err = 0;
1999 void *data;
2001 if (!data_size)
2002 return 0;
2004 page = drbd_alloc_pages(peer_device, 1, 1);
2006 data = kmap(page);
2007 while (data_size) {
2008 unsigned int len = min_t(int, data_size, PAGE_SIZE);
2010 err = drbd_recv_all_warn(peer_device->connection, data, len);
2011 if (err)
2012 break;
2013 data_size -= len;
2015 kunmap(page);
2016 drbd_free_pages(peer_device->device, page, 0);
2017 return err;
2020 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
2021 sector_t sector, int data_size)
2023 struct bio_vec bvec;
2024 struct bvec_iter iter;
2025 struct bio *bio;
2026 int digest_size, err, expect;
2027 void *dig_in = peer_device->connection->int_dig_in;
2028 void *dig_vv = peer_device->connection->int_dig_vv;
2030 digest_size = 0;
2031 if (peer_device->connection->peer_integrity_tfm) {
2032 digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
2033 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
2034 if (err)
2035 return err;
2036 data_size -= digest_size;
2039 /* optimistically update recv_cnt. if receiving fails below,
2040 * we disconnect anyways, and counters will be reset. */
2041 peer_device->device->recv_cnt += data_size>>9;
2043 bio = req->master_bio;
2044 D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
2046 bio_for_each_segment(bvec, bio, iter) {
2047 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
2048 expect = min_t(int, data_size, bvec.bv_len);
2049 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
2050 kunmap(bvec.bv_page);
2051 if (err)
2052 return err;
2053 data_size -= expect;
2056 if (digest_size) {
2057 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
2058 if (memcmp(dig_in, dig_vv, digest_size)) {
2059 drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
2060 return -EINVAL;
2064 D_ASSERT(peer_device->device, data_size == 0);
2065 return 0;
2069 * e_end_resync_block() is called in ack_sender context via
2070 * drbd_finish_peer_reqs().
2072 static int e_end_resync_block(struct drbd_work *w, int unused)
2074 struct drbd_peer_request *peer_req =
2075 container_of(w, struct drbd_peer_request, w);
2076 struct drbd_peer_device *peer_device = peer_req->peer_device;
2077 struct drbd_device *device = peer_device->device;
2078 sector_t sector = peer_req->i.sector;
2079 int err;
2081 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2083 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2084 drbd_set_in_sync(device, sector, peer_req->i.size);
2085 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
2086 } else {
2087 /* Record failure to sync */
2088 drbd_rs_failed_io(device, sector, peer_req->i.size);
2090 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2092 dec_unacked(device);
2094 return err;
2097 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
2098 struct packet_info *pi) __releases(local)
2100 struct drbd_device *device = peer_device->device;
2101 struct drbd_peer_request *peer_req;
2103 peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
2104 if (!peer_req)
2105 goto fail;
2107 dec_rs_pending(device);
2109 inc_unacked(device);
2110 /* corresponding dec_unacked() in e_end_resync_block()
2111 * respective _drbd_clear_done_ee */
2113 peer_req->w.cb = e_end_resync_block;
2114 peer_req->submit_jif = jiffies;
2116 spin_lock_irq(&device->resource->req_lock);
2117 list_add_tail(&peer_req->w.list, &device->sync_ee);
2118 spin_unlock_irq(&device->resource->req_lock);
2120 atomic_add(pi->size >> 9, &device->rs_sect_ev);
2121 if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
2122 DRBD_FAULT_RS_WR) == 0)
2123 return 0;
2125 /* don't care for the reason here */
2126 drbd_err(device, "submit failed, triggering re-connect\n");
2127 spin_lock_irq(&device->resource->req_lock);
2128 list_del(&peer_req->w.list);
2129 spin_unlock_irq(&device->resource->req_lock);
2131 drbd_free_peer_req(device, peer_req);
2132 fail:
2133 put_ldev(device);
2134 return -EIO;
2137 static struct drbd_request *
2138 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
2139 sector_t sector, bool missing_ok, const char *func)
2141 struct drbd_request *req;
2143 /* Request object according to our peer */
2144 req = (struct drbd_request *)(unsigned long)id;
2145 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
2146 return req;
2147 if (!missing_ok) {
2148 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
2149 (unsigned long)id, (unsigned long long)sector);
2151 return NULL;
2154 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
2156 struct drbd_peer_device *peer_device;
2157 struct drbd_device *device;
2158 struct drbd_request *req;
2159 sector_t sector;
2160 int err;
2161 struct p_data *p = pi->data;
2163 peer_device = conn_peer_device(connection, pi->vnr);
2164 if (!peer_device)
2165 return -EIO;
2166 device = peer_device->device;
2168 sector = be64_to_cpu(p->sector);
2170 spin_lock_irq(&device->resource->req_lock);
2171 req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2172 spin_unlock_irq(&device->resource->req_lock);
2173 if (unlikely(!req))
2174 return -EIO;
2176 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2177 * special casing it there for the various failure cases.
2178 * still no race with drbd_fail_pending_reads */
2179 err = recv_dless_read(peer_device, req, sector, pi->size);
2180 if (!err)
2181 req_mod(req, DATA_RECEIVED);
2182 /* else: nothing. handled from drbd_disconnect...
2183 * I don't think we may complete this just yet
2184 * in case we are "on-disconnect: freeze" */
2186 return err;
2189 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2191 struct drbd_peer_device *peer_device;
2192 struct drbd_device *device;
2193 sector_t sector;
2194 int err;
2195 struct p_data *p = pi->data;
2197 peer_device = conn_peer_device(connection, pi->vnr);
2198 if (!peer_device)
2199 return -EIO;
2200 device = peer_device->device;
2202 sector = be64_to_cpu(p->sector);
2203 D_ASSERT(device, p->block_id == ID_SYNCER);
2205 if (get_ldev(device)) {
2206 /* data is submitted to disk within recv_resync_read.
2207 * corresponding put_ldev done below on error,
2208 * or in drbd_peer_request_endio. */
2209 err = recv_resync_read(peer_device, sector, pi);
2210 } else {
2211 if (__ratelimit(&drbd_ratelimit_state))
2212 drbd_err(device, "Can not write resync data to local disk.\n");
2214 err = drbd_drain_block(peer_device, pi->size);
2216 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2219 atomic_add(pi->size >> 9, &device->rs_sect_in);
2221 return err;
2224 static void restart_conflicting_writes(struct drbd_device *device,
2225 sector_t sector, int size)
2227 struct drbd_interval *i;
2228 struct drbd_request *req;
2230 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2231 if (!i->local)
2232 continue;
2233 req = container_of(i, struct drbd_request, i);
2234 if (req->rq_state & RQ_LOCAL_PENDING ||
2235 !(req->rq_state & RQ_POSTPONED))
2236 continue;
2237 /* as it is RQ_POSTPONED, this will cause it to
2238 * be queued on the retry workqueue. */
2239 __req_mod(req, CONFLICT_RESOLVED, NULL);
2244 * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2246 static int e_end_block(struct drbd_work *w, int cancel)
2248 struct drbd_peer_request *peer_req =
2249 container_of(w, struct drbd_peer_request, w);
2250 struct drbd_peer_device *peer_device = peer_req->peer_device;
2251 struct drbd_device *device = peer_device->device;
2252 sector_t sector = peer_req->i.sector;
2253 int err = 0, pcmd;
2255 if (peer_req->flags & EE_SEND_WRITE_ACK) {
2256 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2257 pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2258 device->state.conn <= C_PAUSED_SYNC_T &&
2259 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2260 P_RS_WRITE_ACK : P_WRITE_ACK;
2261 err = drbd_send_ack(peer_device, pcmd, peer_req);
2262 if (pcmd == P_RS_WRITE_ACK)
2263 drbd_set_in_sync(device, sector, peer_req->i.size);
2264 } else {
2265 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2266 /* we expect it to be marked out of sync anyways...
2267 * maybe assert this? */
2269 dec_unacked(device);
2272 /* we delete from the conflict detection hash _after_ we sent out the
2273 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
2274 if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2275 spin_lock_irq(&device->resource->req_lock);
2276 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2277 drbd_remove_epoch_entry_interval(device, peer_req);
2278 if (peer_req->flags & EE_RESTART_REQUESTS)
2279 restart_conflicting_writes(device, sector, peer_req->i.size);
2280 spin_unlock_irq(&device->resource->req_lock);
2281 } else
2282 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2284 drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2286 return err;
2289 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2291 struct drbd_peer_request *peer_req =
2292 container_of(w, struct drbd_peer_request, w);
2293 struct drbd_peer_device *peer_device = peer_req->peer_device;
2294 int err;
2296 err = drbd_send_ack(peer_device, ack, peer_req);
2297 dec_unacked(peer_device->device);
2299 return err;
2302 static int e_send_superseded(struct drbd_work *w, int unused)
2304 return e_send_ack(w, P_SUPERSEDED);
2307 static int e_send_retry_write(struct drbd_work *w, int unused)
2309 struct drbd_peer_request *peer_req =
2310 container_of(w, struct drbd_peer_request, w);
2311 struct drbd_connection *connection = peer_req->peer_device->connection;
2313 return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2314 P_RETRY_WRITE : P_SUPERSEDED);
2317 static bool seq_greater(u32 a, u32 b)
2320 * We assume 32-bit wrap-around here.
2321 * For 24-bit wrap-around, we would have to shift:
2322 * a <<= 8; b <<= 8;
2324 return (s32)a - (s32)b > 0;
2327 static u32 seq_max(u32 a, u32 b)
2329 return seq_greater(a, b) ? a : b;
2332 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2334 struct drbd_device *device = peer_device->device;
2335 unsigned int newest_peer_seq;
2337 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2338 spin_lock(&device->peer_seq_lock);
2339 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2340 device->peer_seq = newest_peer_seq;
2341 spin_unlock(&device->peer_seq_lock);
2342 /* wake up only if we actually changed device->peer_seq */
2343 if (peer_seq == newest_peer_seq)
2344 wake_up(&device->seq_wait);
2348 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2350 return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2353 /* maybe change sync_ee into interval trees as well? */
2354 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2356 struct drbd_peer_request *rs_req;
2357 bool rv = false;
2359 spin_lock_irq(&device->resource->req_lock);
2360 list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2361 if (overlaps(peer_req->i.sector, peer_req->i.size,
2362 rs_req->i.sector, rs_req->i.size)) {
2363 rv = true;
2364 break;
2367 spin_unlock_irq(&device->resource->req_lock);
2369 return rv;
2372 /* Called from receive_Data.
2373 * Synchronize packets on sock with packets on msock.
2375 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2376 * packet traveling on msock, they are still processed in the order they have
2377 * been sent.
2379 * Note: we don't care for Ack packets overtaking P_DATA packets.
2381 * In case packet_seq is larger than device->peer_seq number, there are
2382 * outstanding packets on the msock. We wait for them to arrive.
2383 * In case we are the logically next packet, we update device->peer_seq
2384 * ourselves. Correctly handles 32bit wrap around.
2386 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2387 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2388 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2389 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2391 * returns 0 if we may process the packet,
2392 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2393 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2395 struct drbd_device *device = peer_device->device;
2396 DEFINE_WAIT(wait);
2397 long timeout;
2398 int ret = 0, tp;
2400 if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2401 return 0;
2403 spin_lock(&device->peer_seq_lock);
2404 for (;;) {
2405 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2406 device->peer_seq = seq_max(device->peer_seq, peer_seq);
2407 break;
2410 if (signal_pending(current)) {
2411 ret = -ERESTARTSYS;
2412 break;
2415 rcu_read_lock();
2416 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2417 rcu_read_unlock();
2419 if (!tp)
2420 break;
2422 /* Only need to wait if two_primaries is enabled */
2423 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2424 spin_unlock(&device->peer_seq_lock);
2425 rcu_read_lock();
2426 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2427 rcu_read_unlock();
2428 timeout = schedule_timeout(timeout);
2429 spin_lock(&device->peer_seq_lock);
2430 if (!timeout) {
2431 ret = -ETIMEDOUT;
2432 drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2433 break;
2436 spin_unlock(&device->peer_seq_lock);
2437 finish_wait(&device->seq_wait, &wait);
2438 return ret;
2441 /* see also bio_flags_to_wire()
2442 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2443 * flags and back. We may replicate to other kernel versions. */
2444 static unsigned long wire_flags_to_bio_flags(u32 dpf)
2446 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2447 (dpf & DP_FUA ? REQ_FUA : 0) |
2448 (dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2451 static unsigned long wire_flags_to_bio_op(u32 dpf)
2453 if (dpf & DP_ZEROES)
2454 return REQ_OP_WRITE_ZEROES;
2455 if (dpf & DP_DISCARD)
2456 return REQ_OP_DISCARD;
2457 if (dpf & DP_WSAME)
2458 return REQ_OP_WRITE_SAME;
2459 else
2460 return REQ_OP_WRITE;
2463 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2464 unsigned int size)
2466 struct drbd_interval *i;
2468 repeat:
2469 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2470 struct drbd_request *req;
2471 struct bio_and_error m;
2473 if (!i->local)
2474 continue;
2475 req = container_of(i, struct drbd_request, i);
2476 if (!(req->rq_state & RQ_POSTPONED))
2477 continue;
2478 req->rq_state &= ~RQ_POSTPONED;
2479 __req_mod(req, NEG_ACKED, &m);
2480 spin_unlock_irq(&device->resource->req_lock);
2481 if (m.bio)
2482 complete_master_bio(device, &m);
2483 spin_lock_irq(&device->resource->req_lock);
2484 goto repeat;
2488 static int handle_write_conflicts(struct drbd_device *device,
2489 struct drbd_peer_request *peer_req)
2491 struct drbd_connection *connection = peer_req->peer_device->connection;
2492 bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2493 sector_t sector = peer_req->i.sector;
2494 const unsigned int size = peer_req->i.size;
2495 struct drbd_interval *i;
2496 bool equal;
2497 int err;
2500 * Inserting the peer request into the write_requests tree will prevent
2501 * new conflicting local requests from being added.
2503 drbd_insert_interval(&device->write_requests, &peer_req->i);
2505 repeat:
2506 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2507 if (i == &peer_req->i)
2508 continue;
2509 if (i->completed)
2510 continue;
2512 if (!i->local) {
2514 * Our peer has sent a conflicting remote request; this
2515 * should not happen in a two-node setup. Wait for the
2516 * earlier peer request to complete.
2518 err = drbd_wait_misc(device, i);
2519 if (err)
2520 goto out;
2521 goto repeat;
2524 equal = i->sector == sector && i->size == size;
2525 if (resolve_conflicts) {
2527 * If the peer request is fully contained within the
2528 * overlapping request, it can be considered overwritten
2529 * and thus superseded; otherwise, it will be retried
2530 * once all overlapping requests have completed.
2532 bool superseded = i->sector <= sector && i->sector +
2533 (i->size >> 9) >= sector + (size >> 9);
2535 if (!equal)
2536 drbd_alert(device, "Concurrent writes detected: "
2537 "local=%llus +%u, remote=%llus +%u, "
2538 "assuming %s came first\n",
2539 (unsigned long long)i->sector, i->size,
2540 (unsigned long long)sector, size,
2541 superseded ? "local" : "remote");
2543 peer_req->w.cb = superseded ? e_send_superseded :
2544 e_send_retry_write;
2545 list_add_tail(&peer_req->w.list, &device->done_ee);
2546 queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2548 err = -ENOENT;
2549 goto out;
2550 } else {
2551 struct drbd_request *req =
2552 container_of(i, struct drbd_request, i);
2554 if (!equal)
2555 drbd_alert(device, "Concurrent writes detected: "
2556 "local=%llus +%u, remote=%llus +%u\n",
2557 (unsigned long long)i->sector, i->size,
2558 (unsigned long long)sector, size);
2560 if (req->rq_state & RQ_LOCAL_PENDING ||
2561 !(req->rq_state & RQ_POSTPONED)) {
2563 * Wait for the node with the discard flag to
2564 * decide if this request has been superseded
2565 * or needs to be retried.
2566 * Requests that have been superseded will
2567 * disappear from the write_requests tree.
2569 * In addition, wait for the conflicting
2570 * request to finish locally before submitting
2571 * the conflicting peer request.
2573 err = drbd_wait_misc(device, &req->i);
2574 if (err) {
2575 _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2576 fail_postponed_requests(device, sector, size);
2577 goto out;
2579 goto repeat;
2582 * Remember to restart the conflicting requests after
2583 * the new peer request has completed.
2585 peer_req->flags |= EE_RESTART_REQUESTS;
2588 err = 0;
2590 out:
2591 if (err)
2592 drbd_remove_epoch_entry_interval(device, peer_req);
2593 return err;
2596 /* mirrored write */
2597 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2599 struct drbd_peer_device *peer_device;
2600 struct drbd_device *device;
2601 struct net_conf *nc;
2602 sector_t sector;
2603 struct drbd_peer_request *peer_req;
2604 struct p_data *p = pi->data;
2605 u32 peer_seq = be32_to_cpu(p->seq_num);
2606 int op, op_flags;
2607 u32 dp_flags;
2608 int err, tp;
2610 peer_device = conn_peer_device(connection, pi->vnr);
2611 if (!peer_device)
2612 return -EIO;
2613 device = peer_device->device;
2615 if (!get_ldev(device)) {
2616 int err2;
2618 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2619 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2620 atomic_inc(&connection->current_epoch->epoch_size);
2621 err2 = drbd_drain_block(peer_device, pi->size);
2622 if (!err)
2623 err = err2;
2624 return err;
2628 * Corresponding put_ldev done either below (on various errors), or in
2629 * drbd_peer_request_endio, if we successfully submit the data at the
2630 * end of this function.
2633 sector = be64_to_cpu(p->sector);
2634 peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2635 if (!peer_req) {
2636 put_ldev(device);
2637 return -EIO;
2640 peer_req->w.cb = e_end_block;
2641 peer_req->submit_jif = jiffies;
2642 peer_req->flags |= EE_APPLICATION;
2644 dp_flags = be32_to_cpu(p->dp_flags);
2645 op = wire_flags_to_bio_op(dp_flags);
2646 op_flags = wire_flags_to_bio_flags(dp_flags);
2647 if (pi->cmd == P_TRIM) {
2648 D_ASSERT(peer_device, peer_req->i.size > 0);
2649 D_ASSERT(peer_device, op == REQ_OP_DISCARD);
2650 D_ASSERT(peer_device, peer_req->pages == NULL);
2651 /* need to play safe: an older DRBD sender
2652 * may mean zero-out while sending P_TRIM. */
2653 if (0 == (connection->agreed_features & DRBD_FF_WZEROES))
2654 peer_req->flags |= EE_ZEROOUT;
2655 } else if (pi->cmd == P_ZEROES) {
2656 D_ASSERT(peer_device, peer_req->i.size > 0);
2657 D_ASSERT(peer_device, op == REQ_OP_WRITE_ZEROES);
2658 D_ASSERT(peer_device, peer_req->pages == NULL);
2659 /* Do (not) pass down BLKDEV_ZERO_NOUNMAP? */
2660 if (dp_flags & DP_DISCARD)
2661 peer_req->flags |= EE_TRIM;
2662 } else if (peer_req->pages == NULL) {
2663 D_ASSERT(device, peer_req->i.size == 0);
2664 D_ASSERT(device, dp_flags & DP_FLUSH);
2667 if (dp_flags & DP_MAY_SET_IN_SYNC)
2668 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2670 spin_lock(&connection->epoch_lock);
2671 peer_req->epoch = connection->current_epoch;
2672 atomic_inc(&peer_req->epoch->epoch_size);
2673 atomic_inc(&peer_req->epoch->active);
2674 spin_unlock(&connection->epoch_lock);
2676 rcu_read_lock();
2677 nc = rcu_dereference(peer_device->connection->net_conf);
2678 tp = nc->two_primaries;
2679 if (peer_device->connection->agreed_pro_version < 100) {
2680 switch (nc->wire_protocol) {
2681 case DRBD_PROT_C:
2682 dp_flags |= DP_SEND_WRITE_ACK;
2683 break;
2684 case DRBD_PROT_B:
2685 dp_flags |= DP_SEND_RECEIVE_ACK;
2686 break;
2689 rcu_read_unlock();
2691 if (dp_flags & DP_SEND_WRITE_ACK) {
2692 peer_req->flags |= EE_SEND_WRITE_ACK;
2693 inc_unacked(device);
2694 /* corresponding dec_unacked() in e_end_block()
2695 * respective _drbd_clear_done_ee */
2698 if (dp_flags & DP_SEND_RECEIVE_ACK) {
2699 /* I really don't like it that the receiver thread
2700 * sends on the msock, but anyways */
2701 drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2704 if (tp) {
2705 /* two primaries implies protocol C */
2706 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2707 peer_req->flags |= EE_IN_INTERVAL_TREE;
2708 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2709 if (err)
2710 goto out_interrupted;
2711 spin_lock_irq(&device->resource->req_lock);
2712 err = handle_write_conflicts(device, peer_req);
2713 if (err) {
2714 spin_unlock_irq(&device->resource->req_lock);
2715 if (err == -ENOENT) {
2716 put_ldev(device);
2717 return 0;
2719 goto out_interrupted;
2721 } else {
2722 update_peer_seq(peer_device, peer_seq);
2723 spin_lock_irq(&device->resource->req_lock);
2725 /* TRIM and WRITE_SAME are processed synchronously,
2726 * we wait for all pending requests, respectively wait for
2727 * active_ee to become empty in drbd_submit_peer_request();
2728 * better not add ourselves here. */
2729 if ((peer_req->flags & (EE_TRIM|EE_WRITE_SAME|EE_ZEROOUT)) == 0)
2730 list_add_tail(&peer_req->w.list, &device->active_ee);
2731 spin_unlock_irq(&device->resource->req_lock);
2733 if (device->state.conn == C_SYNC_TARGET)
2734 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2736 if (device->state.pdsk < D_INCONSISTENT) {
2737 /* In case we have the only disk of the cluster, */
2738 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2739 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2740 drbd_al_begin_io(device, &peer_req->i);
2741 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2744 err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2745 DRBD_FAULT_DT_WR);
2746 if (!err)
2747 return 0;
2749 /* don't care for the reason here */
2750 drbd_err(device, "submit failed, triggering re-connect\n");
2751 spin_lock_irq(&device->resource->req_lock);
2752 list_del(&peer_req->w.list);
2753 drbd_remove_epoch_entry_interval(device, peer_req);
2754 spin_unlock_irq(&device->resource->req_lock);
2755 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2756 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2757 drbd_al_complete_io(device, &peer_req->i);
2760 out_interrupted:
2761 drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2762 put_ldev(device);
2763 drbd_free_peer_req(device, peer_req);
2764 return err;
2767 /* We may throttle resync, if the lower device seems to be busy,
2768 * and current sync rate is above c_min_rate.
2770 * To decide whether or not the lower device is busy, we use a scheme similar
2771 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2772 * (more than 64 sectors) of activity we cannot account for with our own resync
2773 * activity, it obviously is "busy".
2775 * The current sync rate used here uses only the most recent two step marks,
2776 * to have a short time average so we can react faster.
2778 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2779 bool throttle_if_app_is_waiting)
2781 struct lc_element *tmp;
2782 bool throttle = drbd_rs_c_min_rate_throttle(device);
2784 if (!throttle || throttle_if_app_is_waiting)
2785 return throttle;
2787 spin_lock_irq(&device->al_lock);
2788 tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2789 if (tmp) {
2790 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2791 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2792 throttle = false;
2793 /* Do not slow down if app IO is already waiting for this extent,
2794 * and our progress is necessary for application IO to complete. */
2796 spin_unlock_irq(&device->al_lock);
2798 return throttle;
2801 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2803 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2804 unsigned long db, dt, dbdt;
2805 unsigned int c_min_rate;
2806 int curr_events;
2808 rcu_read_lock();
2809 c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2810 rcu_read_unlock();
2812 /* feature disabled? */
2813 if (c_min_rate == 0)
2814 return false;
2816 curr_events = (int)part_stat_read_accum(&disk->part0, sectors) -
2817 atomic_read(&device->rs_sect_ev);
2819 if (atomic_read(&device->ap_actlog_cnt)
2820 || curr_events - device->rs_last_events > 64) {
2821 unsigned long rs_left;
2822 int i;
2824 device->rs_last_events = curr_events;
2826 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2827 * approx. */
2828 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2830 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2831 rs_left = device->ov_left;
2832 else
2833 rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2835 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2836 if (!dt)
2837 dt++;
2838 db = device->rs_mark_left[i] - rs_left;
2839 dbdt = Bit2KB(db/dt);
2841 if (dbdt > c_min_rate)
2842 return true;
2844 return false;
2847 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2849 struct drbd_peer_device *peer_device;
2850 struct drbd_device *device;
2851 sector_t sector;
2852 sector_t capacity;
2853 struct drbd_peer_request *peer_req;
2854 struct digest_info *di = NULL;
2855 int size, verb;
2856 unsigned int fault_type;
2857 struct p_block_req *p = pi->data;
2859 peer_device = conn_peer_device(connection, pi->vnr);
2860 if (!peer_device)
2861 return -EIO;
2862 device = peer_device->device;
2863 capacity = drbd_get_capacity(device->this_bdev);
2865 sector = be64_to_cpu(p->sector);
2866 size = be32_to_cpu(p->blksize);
2868 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2869 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2870 (unsigned long long)sector, size);
2871 return -EINVAL;
2873 if (sector + (size>>9) > capacity) {
2874 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2875 (unsigned long long)sector, size);
2876 return -EINVAL;
2879 if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2880 verb = 1;
2881 switch (pi->cmd) {
2882 case P_DATA_REQUEST:
2883 drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2884 break;
2885 case P_RS_THIN_REQ:
2886 case P_RS_DATA_REQUEST:
2887 case P_CSUM_RS_REQUEST:
2888 case P_OV_REQUEST:
2889 drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2890 break;
2891 case P_OV_REPLY:
2892 verb = 0;
2893 dec_rs_pending(device);
2894 drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2895 break;
2896 default:
2897 BUG();
2899 if (verb && __ratelimit(&drbd_ratelimit_state))
2900 drbd_err(device, "Can not satisfy peer's read request, "
2901 "no local data.\n");
2903 /* drain possibly payload */
2904 return drbd_drain_block(peer_device, pi->size);
2907 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2908 * "criss-cross" setup, that might cause write-out on some other DRBD,
2909 * which in turn might block on the other node at this very place. */
2910 peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2911 size, GFP_NOIO);
2912 if (!peer_req) {
2913 put_ldev(device);
2914 return -ENOMEM;
2917 switch (pi->cmd) {
2918 case P_DATA_REQUEST:
2919 peer_req->w.cb = w_e_end_data_req;
2920 fault_type = DRBD_FAULT_DT_RD;
2921 /* application IO, don't drbd_rs_begin_io */
2922 peer_req->flags |= EE_APPLICATION;
2923 goto submit;
2925 case P_RS_THIN_REQ:
2926 /* If at some point in the future we have a smart way to
2927 find out if this data block is completely deallocated,
2928 then we would do something smarter here than reading
2929 the block... */
2930 peer_req->flags |= EE_RS_THIN_REQ;
2931 /* fall through */
2932 case P_RS_DATA_REQUEST:
2933 peer_req->w.cb = w_e_end_rsdata_req;
2934 fault_type = DRBD_FAULT_RS_RD;
2935 /* used in the sector offset progress display */
2936 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2937 break;
2939 case P_OV_REPLY:
2940 case P_CSUM_RS_REQUEST:
2941 fault_type = DRBD_FAULT_RS_RD;
2942 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2943 if (!di)
2944 goto out_free_e;
2946 di->digest_size = pi->size;
2947 di->digest = (((char *)di)+sizeof(struct digest_info));
2949 peer_req->digest = di;
2950 peer_req->flags |= EE_HAS_DIGEST;
2952 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2953 goto out_free_e;
2955 if (pi->cmd == P_CSUM_RS_REQUEST) {
2956 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2957 peer_req->w.cb = w_e_end_csum_rs_req;
2958 /* used in the sector offset progress display */
2959 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2960 /* remember to report stats in drbd_resync_finished */
2961 device->use_csums = true;
2962 } else if (pi->cmd == P_OV_REPLY) {
2963 /* track progress, we may need to throttle */
2964 atomic_add(size >> 9, &device->rs_sect_in);
2965 peer_req->w.cb = w_e_end_ov_reply;
2966 dec_rs_pending(device);
2967 /* drbd_rs_begin_io done when we sent this request,
2968 * but accounting still needs to be done. */
2969 goto submit_for_resync;
2971 break;
2973 case P_OV_REQUEST:
2974 if (device->ov_start_sector == ~(sector_t)0 &&
2975 peer_device->connection->agreed_pro_version >= 90) {
2976 unsigned long now = jiffies;
2977 int i;
2978 device->ov_start_sector = sector;
2979 device->ov_position = sector;
2980 device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2981 device->rs_total = device->ov_left;
2982 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2983 device->rs_mark_left[i] = device->ov_left;
2984 device->rs_mark_time[i] = now;
2986 drbd_info(device, "Online Verify start sector: %llu\n",
2987 (unsigned long long)sector);
2989 peer_req->w.cb = w_e_end_ov_req;
2990 fault_type = DRBD_FAULT_RS_RD;
2991 break;
2993 default:
2994 BUG();
2997 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2998 * wrt the receiver, but it is not as straightforward as it may seem.
2999 * Various places in the resync start and stop logic assume resync
3000 * requests are processed in order, requeuing this on the worker thread
3001 * introduces a bunch of new code for synchronization between threads.
3003 * Unlimited throttling before drbd_rs_begin_io may stall the resync
3004 * "forever", throttling after drbd_rs_begin_io will lock that extent
3005 * for application writes for the same time. For now, just throttle
3006 * here, where the rest of the code expects the receiver to sleep for
3007 * a while, anyways.
3010 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
3011 * this defers syncer requests for some time, before letting at least
3012 * on request through. The resync controller on the receiving side
3013 * will adapt to the incoming rate accordingly.
3015 * We cannot throttle here if remote is Primary/SyncTarget:
3016 * we would also throttle its application reads.
3017 * In that case, throttling is done on the SyncTarget only.
3020 /* Even though this may be a resync request, we do add to "read_ee";
3021 * "sync_ee" is only used for resync WRITEs.
3022 * Add to list early, so debugfs can find this request
3023 * even if we have to sleep below. */
3024 spin_lock_irq(&device->resource->req_lock);
3025 list_add_tail(&peer_req->w.list, &device->read_ee);
3026 spin_unlock_irq(&device->resource->req_lock);
3028 update_receiver_timing_details(connection, drbd_rs_should_slow_down);
3029 if (device->state.peer != R_PRIMARY
3030 && drbd_rs_should_slow_down(device, sector, false))
3031 schedule_timeout_uninterruptible(HZ/10);
3032 update_receiver_timing_details(connection, drbd_rs_begin_io);
3033 if (drbd_rs_begin_io(device, sector))
3034 goto out_free_e;
3036 submit_for_resync:
3037 atomic_add(size >> 9, &device->rs_sect_ev);
3039 submit:
3040 update_receiver_timing_details(connection, drbd_submit_peer_request);
3041 inc_unacked(device);
3042 if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
3043 fault_type) == 0)
3044 return 0;
3046 /* don't care for the reason here */
3047 drbd_err(device, "submit failed, triggering re-connect\n");
3049 out_free_e:
3050 spin_lock_irq(&device->resource->req_lock);
3051 list_del(&peer_req->w.list);
3052 spin_unlock_irq(&device->resource->req_lock);
3053 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
3055 put_ldev(device);
3056 drbd_free_peer_req(device, peer_req);
3057 return -EIO;
3061 * drbd_asb_recover_0p - Recover after split-brain with no remaining primaries
3063 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
3065 struct drbd_device *device = peer_device->device;
3066 int self, peer, rv = -100;
3067 unsigned long ch_self, ch_peer;
3068 enum drbd_after_sb_p after_sb_0p;
3070 self = device->ldev->md.uuid[UI_BITMAP] & 1;
3071 peer = device->p_uuid[UI_BITMAP] & 1;
3073 ch_peer = device->p_uuid[UI_SIZE];
3074 ch_self = device->comm_bm_set;
3076 rcu_read_lock();
3077 after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
3078 rcu_read_unlock();
3079 switch (after_sb_0p) {
3080 case ASB_CONSENSUS:
3081 case ASB_DISCARD_SECONDARY:
3082 case ASB_CALL_HELPER:
3083 case ASB_VIOLENTLY:
3084 drbd_err(device, "Configuration error.\n");
3085 break;
3086 case ASB_DISCONNECT:
3087 break;
3088 case ASB_DISCARD_YOUNGER_PRI:
3089 if (self == 0 && peer == 1) {
3090 rv = -1;
3091 break;
3093 if (self == 1 && peer == 0) {
3094 rv = 1;
3095 break;
3097 /* Else fall through to one of the other strategies... */
3098 case ASB_DISCARD_OLDER_PRI:
3099 if (self == 0 && peer == 1) {
3100 rv = 1;
3101 break;
3103 if (self == 1 && peer == 0) {
3104 rv = -1;
3105 break;
3107 /* Else fall through to one of the other strategies... */
3108 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
3109 "Using discard-least-changes instead\n");
3110 /* fall through */
3111 case ASB_DISCARD_ZERO_CHG:
3112 if (ch_peer == 0 && ch_self == 0) {
3113 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3114 ? -1 : 1;
3115 break;
3116 } else {
3117 if (ch_peer == 0) { rv = 1; break; }
3118 if (ch_self == 0) { rv = -1; break; }
3120 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
3121 break;
3122 /* else: fall through */
3123 case ASB_DISCARD_LEAST_CHG:
3124 if (ch_self < ch_peer)
3125 rv = -1;
3126 else if (ch_self > ch_peer)
3127 rv = 1;
3128 else /* ( ch_self == ch_peer ) */
3129 /* Well, then use something else. */
3130 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3131 ? -1 : 1;
3132 break;
3133 case ASB_DISCARD_LOCAL:
3134 rv = -1;
3135 break;
3136 case ASB_DISCARD_REMOTE:
3137 rv = 1;
3140 return rv;
3144 * drbd_asb_recover_1p - Recover after split-brain with one remaining primary
3146 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
3148 struct drbd_device *device = peer_device->device;
3149 int hg, rv = -100;
3150 enum drbd_after_sb_p after_sb_1p;
3152 rcu_read_lock();
3153 after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
3154 rcu_read_unlock();
3155 switch (after_sb_1p) {
3156 case ASB_DISCARD_YOUNGER_PRI:
3157 case ASB_DISCARD_OLDER_PRI:
3158 case ASB_DISCARD_LEAST_CHG:
3159 case ASB_DISCARD_LOCAL:
3160 case ASB_DISCARD_REMOTE:
3161 case ASB_DISCARD_ZERO_CHG:
3162 drbd_err(device, "Configuration error.\n");
3163 break;
3164 case ASB_DISCONNECT:
3165 break;
3166 case ASB_CONSENSUS:
3167 hg = drbd_asb_recover_0p(peer_device);
3168 if (hg == -1 && device->state.role == R_SECONDARY)
3169 rv = hg;
3170 if (hg == 1 && device->state.role == R_PRIMARY)
3171 rv = hg;
3172 break;
3173 case ASB_VIOLENTLY:
3174 rv = drbd_asb_recover_0p(peer_device);
3175 break;
3176 case ASB_DISCARD_SECONDARY:
3177 return device->state.role == R_PRIMARY ? 1 : -1;
3178 case ASB_CALL_HELPER:
3179 hg = drbd_asb_recover_0p(peer_device);
3180 if (hg == -1 && device->state.role == R_PRIMARY) {
3181 enum drbd_state_rv rv2;
3183 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3184 * we might be here in C_WF_REPORT_PARAMS which is transient.
3185 * we do not need to wait for the after state change work either. */
3186 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3187 if (rv2 != SS_SUCCESS) {
3188 drbd_khelper(device, "pri-lost-after-sb");
3189 } else {
3190 drbd_warn(device, "Successfully gave up primary role.\n");
3191 rv = hg;
3193 } else
3194 rv = hg;
3197 return rv;
3201 * drbd_asb_recover_2p - Recover after split-brain with two remaining primaries
3203 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3205 struct drbd_device *device = peer_device->device;
3206 int hg, rv = -100;
3207 enum drbd_after_sb_p after_sb_2p;
3209 rcu_read_lock();
3210 after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3211 rcu_read_unlock();
3212 switch (after_sb_2p) {
3213 case ASB_DISCARD_YOUNGER_PRI:
3214 case ASB_DISCARD_OLDER_PRI:
3215 case ASB_DISCARD_LEAST_CHG:
3216 case ASB_DISCARD_LOCAL:
3217 case ASB_DISCARD_REMOTE:
3218 case ASB_CONSENSUS:
3219 case ASB_DISCARD_SECONDARY:
3220 case ASB_DISCARD_ZERO_CHG:
3221 drbd_err(device, "Configuration error.\n");
3222 break;
3223 case ASB_VIOLENTLY:
3224 rv = drbd_asb_recover_0p(peer_device);
3225 break;
3226 case ASB_DISCONNECT:
3227 break;
3228 case ASB_CALL_HELPER:
3229 hg = drbd_asb_recover_0p(peer_device);
3230 if (hg == -1) {
3231 enum drbd_state_rv rv2;
3233 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3234 * we might be here in C_WF_REPORT_PARAMS which is transient.
3235 * we do not need to wait for the after state change work either. */
3236 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3237 if (rv2 != SS_SUCCESS) {
3238 drbd_khelper(device, "pri-lost-after-sb");
3239 } else {
3240 drbd_warn(device, "Successfully gave up primary role.\n");
3241 rv = hg;
3243 } else
3244 rv = hg;
3247 return rv;
3250 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3251 u64 bits, u64 flags)
3253 if (!uuid) {
3254 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3255 return;
3257 drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3258 text,
3259 (unsigned long long)uuid[UI_CURRENT],
3260 (unsigned long long)uuid[UI_BITMAP],
3261 (unsigned long long)uuid[UI_HISTORY_START],
3262 (unsigned long long)uuid[UI_HISTORY_END],
3263 (unsigned long long)bits,
3264 (unsigned long long)flags);
3268 100 after split brain try auto recover
3269 2 C_SYNC_SOURCE set BitMap
3270 1 C_SYNC_SOURCE use BitMap
3271 0 no Sync
3272 -1 C_SYNC_TARGET use BitMap
3273 -2 C_SYNC_TARGET set BitMap
3274 -100 after split brain, disconnect
3275 -1000 unrelated data
3276 -1091 requires proto 91
3277 -1096 requires proto 96
3280 static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3282 struct drbd_peer_device *const peer_device = first_peer_device(device);
3283 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
3284 u64 self, peer;
3285 int i, j;
3287 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3288 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3290 *rule_nr = 10;
3291 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3292 return 0;
3294 *rule_nr = 20;
3295 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3296 peer != UUID_JUST_CREATED)
3297 return -2;
3299 *rule_nr = 30;
3300 if (self != UUID_JUST_CREATED &&
3301 (peer == UUID_JUST_CREATED || peer == (u64)0))
3302 return 2;
3304 if (self == peer) {
3305 int rct, dc; /* roles at crash time */
3307 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3309 if (connection->agreed_pro_version < 91)
3310 return -1091;
3312 if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3313 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3314 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3315 drbd_uuid_move_history(device);
3316 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3317 device->ldev->md.uuid[UI_BITMAP] = 0;
3319 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3320 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3321 *rule_nr = 34;
3322 } else {
3323 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3324 *rule_nr = 36;
3327 return 1;
3330 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3332 if (connection->agreed_pro_version < 91)
3333 return -1091;
3335 if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3336 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3337 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3339 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3340 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3341 device->p_uuid[UI_BITMAP] = 0UL;
3343 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3344 *rule_nr = 35;
3345 } else {
3346 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3347 *rule_nr = 37;
3350 return -1;
3353 /* Common power [off|failure] */
3354 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3355 (device->p_uuid[UI_FLAGS] & 2);
3356 /* lowest bit is set when we were primary,
3357 * next bit (weight 2) is set when peer was primary */
3358 *rule_nr = 40;
3360 /* Neither has the "crashed primary" flag set,
3361 * only a replication link hickup. */
3362 if (rct == 0)
3363 return 0;
3365 /* Current UUID equal and no bitmap uuid; does not necessarily
3366 * mean this was a "simultaneous hard crash", maybe IO was
3367 * frozen, so no UUID-bump happened.
3368 * This is a protocol change, overload DRBD_FF_WSAME as flag
3369 * for "new-enough" peer DRBD version. */
3370 if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3371 *rule_nr = 41;
3372 if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3373 drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3374 return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3376 if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3377 /* At least one has the "crashed primary" bit set,
3378 * both are primary now, but neither has rotated its UUIDs?
3379 * "Can not happen." */
3380 drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3381 return -100;
3383 if (device->state.role == R_PRIMARY)
3384 return 1;
3385 return -1;
3388 /* Both are secondary.
3389 * Really looks like recovery from simultaneous hard crash.
3390 * Check which had been primary before, and arbitrate. */
3391 switch (rct) {
3392 case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3393 case 1: /* self_pri && !peer_pri */ return 1;
3394 case 2: /* !self_pri && peer_pri */ return -1;
3395 case 3: /* self_pri && peer_pri */
3396 dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3397 return dc ? -1 : 1;
3401 *rule_nr = 50;
3402 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3403 if (self == peer)
3404 return -1;
3406 *rule_nr = 51;
3407 peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3408 if (self == peer) {
3409 if (connection->agreed_pro_version < 96 ?
3410 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3411 (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3412 peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3413 /* The last P_SYNC_UUID did not get though. Undo the last start of
3414 resync as sync source modifications of the peer's UUIDs. */
3416 if (connection->agreed_pro_version < 91)
3417 return -1091;
3419 device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3420 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3422 drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3423 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3425 return -1;
3429 *rule_nr = 60;
3430 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3431 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3432 peer = device->p_uuid[i] & ~((u64)1);
3433 if (self == peer)
3434 return -2;
3437 *rule_nr = 70;
3438 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3439 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3440 if (self == peer)
3441 return 1;
3443 *rule_nr = 71;
3444 self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3445 if (self == peer) {
3446 if (connection->agreed_pro_version < 96 ?
3447 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3448 (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3449 self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3450 /* The last P_SYNC_UUID did not get though. Undo the last start of
3451 resync as sync source modifications of our UUIDs. */
3453 if (connection->agreed_pro_version < 91)
3454 return -1091;
3456 __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3457 __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3459 drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3460 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3461 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3463 return 1;
3468 *rule_nr = 80;
3469 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3470 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3471 self = device->ldev->md.uuid[i] & ~((u64)1);
3472 if (self == peer)
3473 return 2;
3476 *rule_nr = 90;
3477 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3478 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3479 if (self == peer && self != ((u64)0))
3480 return 100;
3482 *rule_nr = 100;
3483 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3484 self = device->ldev->md.uuid[i] & ~((u64)1);
3485 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3486 peer = device->p_uuid[j] & ~((u64)1);
3487 if (self == peer)
3488 return -100;
3492 return -1000;
3495 /* drbd_sync_handshake() returns the new conn state on success, or
3496 CONN_MASK (-1) on failure.
3498 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3499 enum drbd_role peer_role,
3500 enum drbd_disk_state peer_disk) __must_hold(local)
3502 struct drbd_device *device = peer_device->device;
3503 enum drbd_conns rv = C_MASK;
3504 enum drbd_disk_state mydisk;
3505 struct net_conf *nc;
3506 int hg, rule_nr, rr_conflict, tentative, always_asbp;
3508 mydisk = device->state.disk;
3509 if (mydisk == D_NEGOTIATING)
3510 mydisk = device->new_state_tmp.disk;
3512 drbd_info(device, "drbd_sync_handshake:\n");
3514 spin_lock_irq(&device->ldev->md.uuid_lock);
3515 drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3516 drbd_uuid_dump(device, "peer", device->p_uuid,
3517 device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3519 hg = drbd_uuid_compare(device, peer_role, &rule_nr);
3520 spin_unlock_irq(&device->ldev->md.uuid_lock);
3522 drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3524 if (hg == -1000) {
3525 drbd_alert(device, "Unrelated data, aborting!\n");
3526 return C_MASK;
3528 if (hg < -0x10000) {
3529 int proto, fflags;
3530 hg = -hg;
3531 proto = hg & 0xff;
3532 fflags = (hg >> 8) & 0xff;
3533 drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3534 proto, fflags);
3535 return C_MASK;
3537 if (hg < -1000) {
3538 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3539 return C_MASK;
3542 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3543 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
3544 int f = (hg == -100) || abs(hg) == 2;
3545 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3546 if (f)
3547 hg = hg*2;
3548 drbd_info(device, "Becoming sync %s due to disk states.\n",
3549 hg > 0 ? "source" : "target");
3552 if (abs(hg) == 100)
3553 drbd_khelper(device, "initial-split-brain");
3555 rcu_read_lock();
3556 nc = rcu_dereference(peer_device->connection->net_conf);
3557 always_asbp = nc->always_asbp;
3558 rr_conflict = nc->rr_conflict;
3559 tentative = nc->tentative;
3560 rcu_read_unlock();
3562 if (hg == 100 || (hg == -100 && always_asbp)) {
3563 int pcount = (device->state.role == R_PRIMARY)
3564 + (peer_role == R_PRIMARY);
3565 int forced = (hg == -100);
3567 switch (pcount) {
3568 case 0:
3569 hg = drbd_asb_recover_0p(peer_device);
3570 break;
3571 case 1:
3572 hg = drbd_asb_recover_1p(peer_device);
3573 break;
3574 case 2:
3575 hg = drbd_asb_recover_2p(peer_device);
3576 break;
3578 if (abs(hg) < 100) {
3579 drbd_warn(device, "Split-Brain detected, %d primaries, "
3580 "automatically solved. Sync from %s node\n",
3581 pcount, (hg < 0) ? "peer" : "this");
3582 if (forced) {
3583 drbd_warn(device, "Doing a full sync, since"
3584 " UUIDs where ambiguous.\n");
3585 hg = hg*2;
3590 if (hg == -100) {
3591 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3592 hg = -1;
3593 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3594 hg = 1;
3596 if (abs(hg) < 100)
3597 drbd_warn(device, "Split-Brain detected, manually solved. "
3598 "Sync from %s node\n",
3599 (hg < 0) ? "peer" : "this");
3602 if (hg == -100) {
3603 /* FIXME this log message is not correct if we end up here
3604 * after an attempted attach on a diskless node.
3605 * We just refuse to attach -- well, we drop the "connection"
3606 * to that disk, in a way... */
3607 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3608 drbd_khelper(device, "split-brain");
3609 return C_MASK;
3612 if (hg > 0 && mydisk <= D_INCONSISTENT) {
3613 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3614 return C_MASK;
3617 if (hg < 0 && /* by intention we do not use mydisk here. */
3618 device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3619 switch (rr_conflict) {
3620 case ASB_CALL_HELPER:
3621 drbd_khelper(device, "pri-lost");
3622 /* fall through */
3623 case ASB_DISCONNECT:
3624 drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3625 return C_MASK;
3626 case ASB_VIOLENTLY:
3627 drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3628 "assumption\n");
3632 if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3633 if (hg == 0)
3634 drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3635 else
3636 drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3637 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3638 abs(hg) >= 2 ? "full" : "bit-map based");
3639 return C_MASK;
3642 if (abs(hg) >= 2) {
3643 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3644 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3645 BM_LOCKED_SET_ALLOWED))
3646 return C_MASK;
3649 if (hg > 0) { /* become sync source. */
3650 rv = C_WF_BITMAP_S;
3651 } else if (hg < 0) { /* become sync target */
3652 rv = C_WF_BITMAP_T;
3653 } else {
3654 rv = C_CONNECTED;
3655 if (drbd_bm_total_weight(device)) {
3656 drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3657 drbd_bm_total_weight(device));
3661 return rv;
3664 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3666 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3667 if (peer == ASB_DISCARD_REMOTE)
3668 return ASB_DISCARD_LOCAL;
3670 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3671 if (peer == ASB_DISCARD_LOCAL)
3672 return ASB_DISCARD_REMOTE;
3674 /* everything else is valid if they are equal on both sides. */
3675 return peer;
3678 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3680 struct p_protocol *p = pi->data;
3681 enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3682 int p_proto, p_discard_my_data, p_two_primaries, cf;
3683 struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3684 char integrity_alg[SHARED_SECRET_MAX] = "";
3685 struct crypto_shash *peer_integrity_tfm = NULL;
3686 void *int_dig_in = NULL, *int_dig_vv = NULL;
3688 p_proto = be32_to_cpu(p->protocol);
3689 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
3690 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
3691 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
3692 p_two_primaries = be32_to_cpu(p->two_primaries);
3693 cf = be32_to_cpu(p->conn_flags);
3694 p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3696 if (connection->agreed_pro_version >= 87) {
3697 int err;
3699 if (pi->size > sizeof(integrity_alg))
3700 return -EIO;
3701 err = drbd_recv_all(connection, integrity_alg, pi->size);
3702 if (err)
3703 return err;
3704 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3707 if (pi->cmd != P_PROTOCOL_UPDATE) {
3708 clear_bit(CONN_DRY_RUN, &connection->flags);
3710 if (cf & CF_DRY_RUN)
3711 set_bit(CONN_DRY_RUN, &connection->flags);
3713 rcu_read_lock();
3714 nc = rcu_dereference(connection->net_conf);
3716 if (p_proto != nc->wire_protocol) {
3717 drbd_err(connection, "incompatible %s settings\n", "protocol");
3718 goto disconnect_rcu_unlock;
3721 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3722 drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3723 goto disconnect_rcu_unlock;
3726 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3727 drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3728 goto disconnect_rcu_unlock;
3731 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3732 drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3733 goto disconnect_rcu_unlock;
3736 if (p_discard_my_data && nc->discard_my_data) {
3737 drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3738 goto disconnect_rcu_unlock;
3741 if (p_two_primaries != nc->two_primaries) {
3742 drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3743 goto disconnect_rcu_unlock;
3746 if (strcmp(integrity_alg, nc->integrity_alg)) {
3747 drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3748 goto disconnect_rcu_unlock;
3751 rcu_read_unlock();
3754 if (integrity_alg[0]) {
3755 int hash_size;
3758 * We can only change the peer data integrity algorithm
3759 * here. Changing our own data integrity algorithm
3760 * requires that we send a P_PROTOCOL_UPDATE packet at
3761 * the same time; otherwise, the peer has no way to
3762 * tell between which packets the algorithm should
3763 * change.
3766 peer_integrity_tfm = crypto_alloc_shash(integrity_alg, 0, 0);
3767 if (IS_ERR(peer_integrity_tfm)) {
3768 peer_integrity_tfm = NULL;
3769 drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3770 integrity_alg);
3771 goto disconnect;
3774 hash_size = crypto_shash_digestsize(peer_integrity_tfm);
3775 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3776 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3777 if (!(int_dig_in && int_dig_vv)) {
3778 drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3779 goto disconnect;
3783 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3784 if (!new_net_conf) {
3785 drbd_err(connection, "Allocation of new net_conf failed\n");
3786 goto disconnect;
3789 mutex_lock(&connection->data.mutex);
3790 mutex_lock(&connection->resource->conf_update);
3791 old_net_conf = connection->net_conf;
3792 *new_net_conf = *old_net_conf;
3794 new_net_conf->wire_protocol = p_proto;
3795 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3796 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3797 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3798 new_net_conf->two_primaries = p_two_primaries;
3800 rcu_assign_pointer(connection->net_conf, new_net_conf);
3801 mutex_unlock(&connection->resource->conf_update);
3802 mutex_unlock(&connection->data.mutex);
3804 crypto_free_shash(connection->peer_integrity_tfm);
3805 kfree(connection->int_dig_in);
3806 kfree(connection->int_dig_vv);
3807 connection->peer_integrity_tfm = peer_integrity_tfm;
3808 connection->int_dig_in = int_dig_in;
3809 connection->int_dig_vv = int_dig_vv;
3811 if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3812 drbd_info(connection, "peer data-integrity-alg: %s\n",
3813 integrity_alg[0] ? integrity_alg : "(none)");
3815 synchronize_rcu();
3816 kfree(old_net_conf);
3817 return 0;
3819 disconnect_rcu_unlock:
3820 rcu_read_unlock();
3821 disconnect:
3822 crypto_free_shash(peer_integrity_tfm);
3823 kfree(int_dig_in);
3824 kfree(int_dig_vv);
3825 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3826 return -EIO;
3829 /* helper function
3830 * input: alg name, feature name
3831 * return: NULL (alg name was "")
3832 * ERR_PTR(error) if something goes wrong
3833 * or the crypto hash ptr, if it worked out ok. */
3834 static struct crypto_shash *drbd_crypto_alloc_digest_safe(
3835 const struct drbd_device *device,
3836 const char *alg, const char *name)
3838 struct crypto_shash *tfm;
3840 if (!alg[0])
3841 return NULL;
3843 tfm = crypto_alloc_shash(alg, 0, 0);
3844 if (IS_ERR(tfm)) {
3845 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3846 alg, name, PTR_ERR(tfm));
3847 return tfm;
3849 return tfm;
3852 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3854 void *buffer = connection->data.rbuf;
3855 int size = pi->size;
3857 while (size) {
3858 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3859 s = drbd_recv(connection, buffer, s);
3860 if (s <= 0) {
3861 if (s < 0)
3862 return s;
3863 break;
3865 size -= s;
3867 if (size)
3868 return -EIO;
3869 return 0;
3873 * config_unknown_volume - device configuration command for unknown volume
3875 * When a device is added to an existing connection, the node on which the
3876 * device is added first will send configuration commands to its peer but the
3877 * peer will not know about the device yet. It will warn and ignore these
3878 * commands. Once the device is added on the second node, the second node will
3879 * send the same device configuration commands, but in the other direction.
3881 * (We can also end up here if drbd is misconfigured.)
3883 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3885 drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3886 cmdname(pi->cmd), pi->vnr);
3887 return ignore_remaining_packet(connection, pi);
3890 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3892 struct drbd_peer_device *peer_device;
3893 struct drbd_device *device;
3894 struct p_rs_param_95 *p;
3895 unsigned int header_size, data_size, exp_max_sz;
3896 struct crypto_shash *verify_tfm = NULL;
3897 struct crypto_shash *csums_tfm = NULL;
3898 struct net_conf *old_net_conf, *new_net_conf = NULL;
3899 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3900 const int apv = connection->agreed_pro_version;
3901 struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3902 int fifo_size = 0;
3903 int err;
3905 peer_device = conn_peer_device(connection, pi->vnr);
3906 if (!peer_device)
3907 return config_unknown_volume(connection, pi);
3908 device = peer_device->device;
3910 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3911 : apv == 88 ? sizeof(struct p_rs_param)
3912 + SHARED_SECRET_MAX
3913 : apv <= 94 ? sizeof(struct p_rs_param_89)
3914 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3916 if (pi->size > exp_max_sz) {
3917 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3918 pi->size, exp_max_sz);
3919 return -EIO;
3922 if (apv <= 88) {
3923 header_size = sizeof(struct p_rs_param);
3924 data_size = pi->size - header_size;
3925 } else if (apv <= 94) {
3926 header_size = sizeof(struct p_rs_param_89);
3927 data_size = pi->size - header_size;
3928 D_ASSERT(device, data_size == 0);
3929 } else {
3930 header_size = sizeof(struct p_rs_param_95);
3931 data_size = pi->size - header_size;
3932 D_ASSERT(device, data_size == 0);
3935 /* initialize verify_alg and csums_alg */
3936 p = pi->data;
3937 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3939 err = drbd_recv_all(peer_device->connection, p, header_size);
3940 if (err)
3941 return err;
3943 mutex_lock(&connection->resource->conf_update);
3944 old_net_conf = peer_device->connection->net_conf;
3945 if (get_ldev(device)) {
3946 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3947 if (!new_disk_conf) {
3948 put_ldev(device);
3949 mutex_unlock(&connection->resource->conf_update);
3950 drbd_err(device, "Allocation of new disk_conf failed\n");
3951 return -ENOMEM;
3954 old_disk_conf = device->ldev->disk_conf;
3955 *new_disk_conf = *old_disk_conf;
3957 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3960 if (apv >= 88) {
3961 if (apv == 88) {
3962 if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3963 drbd_err(device, "verify-alg of wrong size, "
3964 "peer wants %u, accepting only up to %u byte\n",
3965 data_size, SHARED_SECRET_MAX);
3966 err = -EIO;
3967 goto reconnect;
3970 err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3971 if (err)
3972 goto reconnect;
3973 /* we expect NUL terminated string */
3974 /* but just in case someone tries to be evil */
3975 D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3976 p->verify_alg[data_size-1] = 0;
3978 } else /* apv >= 89 */ {
3979 /* we still expect NUL terminated strings */
3980 /* but just in case someone tries to be evil */
3981 D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3982 D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3983 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3984 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3987 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3988 if (device->state.conn == C_WF_REPORT_PARAMS) {
3989 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3990 old_net_conf->verify_alg, p->verify_alg);
3991 goto disconnect;
3993 verify_tfm = drbd_crypto_alloc_digest_safe(device,
3994 p->verify_alg, "verify-alg");
3995 if (IS_ERR(verify_tfm)) {
3996 verify_tfm = NULL;
3997 goto disconnect;
4001 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
4002 if (device->state.conn == C_WF_REPORT_PARAMS) {
4003 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
4004 old_net_conf->csums_alg, p->csums_alg);
4005 goto disconnect;
4007 csums_tfm = drbd_crypto_alloc_digest_safe(device,
4008 p->csums_alg, "csums-alg");
4009 if (IS_ERR(csums_tfm)) {
4010 csums_tfm = NULL;
4011 goto disconnect;
4015 if (apv > 94 && new_disk_conf) {
4016 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
4017 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
4018 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
4019 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
4021 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
4022 if (fifo_size != device->rs_plan_s->size) {
4023 new_plan = fifo_alloc(fifo_size);
4024 if (!new_plan) {
4025 drbd_err(device, "kmalloc of fifo_buffer failed");
4026 put_ldev(device);
4027 goto disconnect;
4032 if (verify_tfm || csums_tfm) {
4033 new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
4034 if (!new_net_conf) {
4035 drbd_err(device, "Allocation of new net_conf failed\n");
4036 goto disconnect;
4039 *new_net_conf = *old_net_conf;
4041 if (verify_tfm) {
4042 strcpy(new_net_conf->verify_alg, p->verify_alg);
4043 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
4044 crypto_free_shash(peer_device->connection->verify_tfm);
4045 peer_device->connection->verify_tfm = verify_tfm;
4046 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
4048 if (csums_tfm) {
4049 strcpy(new_net_conf->csums_alg, p->csums_alg);
4050 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
4051 crypto_free_shash(peer_device->connection->csums_tfm);
4052 peer_device->connection->csums_tfm = csums_tfm;
4053 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
4055 rcu_assign_pointer(connection->net_conf, new_net_conf);
4059 if (new_disk_conf) {
4060 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4061 put_ldev(device);
4064 if (new_plan) {
4065 old_plan = device->rs_plan_s;
4066 rcu_assign_pointer(device->rs_plan_s, new_plan);
4069 mutex_unlock(&connection->resource->conf_update);
4070 synchronize_rcu();
4071 if (new_net_conf)
4072 kfree(old_net_conf);
4073 kfree(old_disk_conf);
4074 kfree(old_plan);
4076 return 0;
4078 reconnect:
4079 if (new_disk_conf) {
4080 put_ldev(device);
4081 kfree(new_disk_conf);
4083 mutex_unlock(&connection->resource->conf_update);
4084 return -EIO;
4086 disconnect:
4087 kfree(new_plan);
4088 if (new_disk_conf) {
4089 put_ldev(device);
4090 kfree(new_disk_conf);
4092 mutex_unlock(&connection->resource->conf_update);
4093 /* just for completeness: actually not needed,
4094 * as this is not reached if csums_tfm was ok. */
4095 crypto_free_shash(csums_tfm);
4096 /* but free the verify_tfm again, if csums_tfm did not work out */
4097 crypto_free_shash(verify_tfm);
4098 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4099 return -EIO;
4102 /* warn if the arguments differ by more than 12.5% */
4103 static void warn_if_differ_considerably(struct drbd_device *device,
4104 const char *s, sector_t a, sector_t b)
4106 sector_t d;
4107 if (a == 0 || b == 0)
4108 return;
4109 d = (a > b) ? (a - b) : (b - a);
4110 if (d > (a>>3) || d > (b>>3))
4111 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
4112 (unsigned long long)a, (unsigned long long)b);
4115 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
4117 struct drbd_peer_device *peer_device;
4118 struct drbd_device *device;
4119 struct p_sizes *p = pi->data;
4120 struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
4121 enum determine_dev_size dd = DS_UNCHANGED;
4122 sector_t p_size, p_usize, p_csize, my_usize;
4123 sector_t new_size, cur_size;
4124 int ldsc = 0; /* local disk size changed */
4125 enum dds_flags ddsf;
4127 peer_device = conn_peer_device(connection, pi->vnr);
4128 if (!peer_device)
4129 return config_unknown_volume(connection, pi);
4130 device = peer_device->device;
4131 cur_size = drbd_get_capacity(device->this_bdev);
4133 p_size = be64_to_cpu(p->d_size);
4134 p_usize = be64_to_cpu(p->u_size);
4135 p_csize = be64_to_cpu(p->c_size);
4137 /* just store the peer's disk size for now.
4138 * we still need to figure out whether we accept that. */
4139 device->p_size = p_size;
4141 if (get_ldev(device)) {
4142 rcu_read_lock();
4143 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
4144 rcu_read_unlock();
4146 warn_if_differ_considerably(device, "lower level device sizes",
4147 p_size, drbd_get_max_capacity(device->ldev));
4148 warn_if_differ_considerably(device, "user requested size",
4149 p_usize, my_usize);
4151 /* if this is the first connect, or an otherwise expected
4152 * param exchange, choose the minimum */
4153 if (device->state.conn == C_WF_REPORT_PARAMS)
4154 p_usize = min_not_zero(my_usize, p_usize);
4156 /* Never shrink a device with usable data during connect,
4157 * or "attach" on the peer.
4158 * But allow online shrinking if we are connected. */
4159 new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
4160 if (new_size < cur_size &&
4161 device->state.disk >= D_OUTDATED &&
4162 (device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS)) {
4163 drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
4164 (unsigned long long)new_size, (unsigned long long)cur_size);
4165 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4166 put_ldev(device);
4167 return -EIO;
4170 if (my_usize != p_usize) {
4171 struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
4173 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
4174 if (!new_disk_conf) {
4175 drbd_err(device, "Allocation of new disk_conf failed\n");
4176 put_ldev(device);
4177 return -ENOMEM;
4180 mutex_lock(&connection->resource->conf_update);
4181 old_disk_conf = device->ldev->disk_conf;
4182 *new_disk_conf = *old_disk_conf;
4183 new_disk_conf->disk_size = p_usize;
4185 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4186 mutex_unlock(&connection->resource->conf_update);
4187 synchronize_rcu();
4188 kfree(old_disk_conf);
4190 drbd_info(device, "Peer sets u_size to %lu sectors (old: %lu)\n",
4191 (unsigned long)p_usize, (unsigned long)my_usize);
4194 put_ldev(device);
4197 device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4198 /* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4199 In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4200 drbd_reconsider_queue_parameters(), we can be sure that after
4201 drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4203 ddsf = be16_to_cpu(p->dds_flags);
4204 if (get_ldev(device)) {
4205 drbd_reconsider_queue_parameters(device, device->ldev, o);
4206 dd = drbd_determine_dev_size(device, ddsf, NULL);
4207 put_ldev(device);
4208 if (dd == DS_ERROR)
4209 return -EIO;
4210 drbd_md_sync(device);
4211 } else {
4213 * I am diskless, need to accept the peer's *current* size.
4214 * I must NOT accept the peers backing disk size,
4215 * it may have been larger than mine all along...
4217 * At this point, the peer knows more about my disk, or at
4218 * least about what we last agreed upon, than myself.
4219 * So if his c_size is less than his d_size, the most likely
4220 * reason is that *my* d_size was smaller last time we checked.
4222 * However, if he sends a zero current size,
4223 * take his (user-capped or) backing disk size anyways.
4225 * Unless of course he does not have a disk himself.
4226 * In which case we ignore this completely.
4228 sector_t new_size = p_csize ?: p_usize ?: p_size;
4229 drbd_reconsider_queue_parameters(device, NULL, o);
4230 if (new_size == 0) {
4231 /* Ignore, peer does not know nothing. */
4232 } else if (new_size == cur_size) {
4233 /* nothing to do */
4234 } else if (cur_size != 0 && p_size == 0) {
4235 drbd_warn(device, "Ignored diskless peer device size (peer:%llu != me:%llu sectors)!\n",
4236 (unsigned long long)new_size, (unsigned long long)cur_size);
4237 } else if (new_size < cur_size && device->state.role == R_PRIMARY) {
4238 drbd_err(device, "The peer's device size is too small! (%llu < %llu sectors); demote me first!\n",
4239 (unsigned long long)new_size, (unsigned long long)cur_size);
4240 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4241 return -EIO;
4242 } else {
4243 /* I believe the peer, if
4244 * - I don't have a current size myself
4245 * - we agree on the size anyways
4246 * - I do have a current size, am Secondary,
4247 * and he has the only disk
4248 * - I do have a current size, am Primary,
4249 * and he has the only disk,
4250 * which is larger than my current size
4252 drbd_set_my_capacity(device, new_size);
4256 if (get_ldev(device)) {
4257 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4258 device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4259 ldsc = 1;
4262 put_ldev(device);
4265 if (device->state.conn > C_WF_REPORT_PARAMS) {
4266 if (be64_to_cpu(p->c_size) !=
4267 drbd_get_capacity(device->this_bdev) || ldsc) {
4268 /* we have different sizes, probably peer
4269 * needs to know my new size... */
4270 drbd_send_sizes(peer_device, 0, ddsf);
4272 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4273 (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4274 if (device->state.pdsk >= D_INCONSISTENT &&
4275 device->state.disk >= D_INCONSISTENT) {
4276 if (ddsf & DDSF_NO_RESYNC)
4277 drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4278 else
4279 resync_after_online_grow(device);
4280 } else
4281 set_bit(RESYNC_AFTER_NEG, &device->flags);
4285 return 0;
4288 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4290 struct drbd_peer_device *peer_device;
4291 struct drbd_device *device;
4292 struct p_uuids *p = pi->data;
4293 u64 *p_uuid;
4294 int i, updated_uuids = 0;
4296 peer_device = conn_peer_device(connection, pi->vnr);
4297 if (!peer_device)
4298 return config_unknown_volume(connection, pi);
4299 device = peer_device->device;
4301 p_uuid = kmalloc_array(UI_EXTENDED_SIZE, sizeof(*p_uuid), GFP_NOIO);
4302 if (!p_uuid) {
4303 drbd_err(device, "kmalloc of p_uuid failed\n");
4304 return false;
4307 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4308 p_uuid[i] = be64_to_cpu(p->uuid[i]);
4310 kfree(device->p_uuid);
4311 device->p_uuid = p_uuid;
4313 if ((device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS) &&
4314 device->state.disk < D_INCONSISTENT &&
4315 device->state.role == R_PRIMARY &&
4316 (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4317 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4318 (unsigned long long)device->ed_uuid);
4319 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4320 return -EIO;
4323 if (get_ldev(device)) {
4324 int skip_initial_sync =
4325 device->state.conn == C_CONNECTED &&
4326 peer_device->connection->agreed_pro_version >= 90 &&
4327 device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4328 (p_uuid[UI_FLAGS] & 8);
4329 if (skip_initial_sync) {
4330 drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4331 drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4332 "clear_n_write from receive_uuids",
4333 BM_LOCKED_TEST_ALLOWED);
4334 _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4335 _drbd_uuid_set(device, UI_BITMAP, 0);
4336 _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4337 CS_VERBOSE, NULL);
4338 drbd_md_sync(device);
4339 updated_uuids = 1;
4341 put_ldev(device);
4342 } else if (device->state.disk < D_INCONSISTENT &&
4343 device->state.role == R_PRIMARY) {
4344 /* I am a diskless primary, the peer just created a new current UUID
4345 for me. */
4346 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4349 /* Before we test for the disk state, we should wait until an eventually
4350 ongoing cluster wide state change is finished. That is important if
4351 we are primary and are detaching from our disk. We need to see the
4352 new disk state... */
4353 mutex_lock(device->state_mutex);
4354 mutex_unlock(device->state_mutex);
4355 if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4356 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4358 if (updated_uuids)
4359 drbd_print_uuids(device, "receiver updated UUIDs to");
4361 return 0;
4365 * convert_state() - Converts the peer's view of the cluster state to our point of view
4366 * @ps: The state as seen by the peer.
4368 static union drbd_state convert_state(union drbd_state ps)
4370 union drbd_state ms;
4372 static enum drbd_conns c_tab[] = {
4373 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4374 [C_CONNECTED] = C_CONNECTED,
4376 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4377 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4378 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4379 [C_VERIFY_S] = C_VERIFY_T,
4380 [C_MASK] = C_MASK,
4383 ms.i = ps.i;
4385 ms.conn = c_tab[ps.conn];
4386 ms.peer = ps.role;
4387 ms.role = ps.peer;
4388 ms.pdsk = ps.disk;
4389 ms.disk = ps.pdsk;
4390 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4392 return ms;
4395 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4397 struct drbd_peer_device *peer_device;
4398 struct drbd_device *device;
4399 struct p_req_state *p = pi->data;
4400 union drbd_state mask, val;
4401 enum drbd_state_rv rv;
4403 peer_device = conn_peer_device(connection, pi->vnr);
4404 if (!peer_device)
4405 return -EIO;
4406 device = peer_device->device;
4408 mask.i = be32_to_cpu(p->mask);
4409 val.i = be32_to_cpu(p->val);
4411 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4412 mutex_is_locked(device->state_mutex)) {
4413 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4414 return 0;
4417 mask = convert_state(mask);
4418 val = convert_state(val);
4420 rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4421 drbd_send_sr_reply(peer_device, rv);
4423 drbd_md_sync(device);
4425 return 0;
4428 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4430 struct p_req_state *p = pi->data;
4431 union drbd_state mask, val;
4432 enum drbd_state_rv rv;
4434 mask.i = be32_to_cpu(p->mask);
4435 val.i = be32_to_cpu(p->val);
4437 if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4438 mutex_is_locked(&connection->cstate_mutex)) {
4439 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4440 return 0;
4443 mask = convert_state(mask);
4444 val = convert_state(val);
4446 rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4447 conn_send_sr_reply(connection, rv);
4449 return 0;
4452 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4454 struct drbd_peer_device *peer_device;
4455 struct drbd_device *device;
4456 struct p_state *p = pi->data;
4457 union drbd_state os, ns, peer_state;
4458 enum drbd_disk_state real_peer_disk;
4459 enum chg_state_flags cs_flags;
4460 int rv;
4462 peer_device = conn_peer_device(connection, pi->vnr);
4463 if (!peer_device)
4464 return config_unknown_volume(connection, pi);
4465 device = peer_device->device;
4467 peer_state.i = be32_to_cpu(p->state);
4469 real_peer_disk = peer_state.disk;
4470 if (peer_state.disk == D_NEGOTIATING) {
4471 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4472 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4475 spin_lock_irq(&device->resource->req_lock);
4476 retry:
4477 os = ns = drbd_read_state(device);
4478 spin_unlock_irq(&device->resource->req_lock);
4480 /* If some other part of the code (ack_receiver thread, timeout)
4481 * already decided to close the connection again,
4482 * we must not "re-establish" it here. */
4483 if (os.conn <= C_TEAR_DOWN)
4484 return -ECONNRESET;
4486 /* If this is the "end of sync" confirmation, usually the peer disk
4487 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4488 * set) resync started in PausedSyncT, or if the timing of pause-/
4489 * unpause-sync events has been "just right", the peer disk may
4490 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4492 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4493 real_peer_disk == D_UP_TO_DATE &&
4494 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4495 /* If we are (becoming) SyncSource, but peer is still in sync
4496 * preparation, ignore its uptodate-ness to avoid flapping, it
4497 * will change to inconsistent once the peer reaches active
4498 * syncing states.
4499 * It may have changed syncer-paused flags, however, so we
4500 * cannot ignore this completely. */
4501 if (peer_state.conn > C_CONNECTED &&
4502 peer_state.conn < C_SYNC_SOURCE)
4503 real_peer_disk = D_INCONSISTENT;
4505 /* if peer_state changes to connected at the same time,
4506 * it explicitly notifies us that it finished resync.
4507 * Maybe we should finish it up, too? */
4508 else if (os.conn >= C_SYNC_SOURCE &&
4509 peer_state.conn == C_CONNECTED) {
4510 if (drbd_bm_total_weight(device) <= device->rs_failed)
4511 drbd_resync_finished(device);
4512 return 0;
4516 /* explicit verify finished notification, stop sector reached. */
4517 if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4518 peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4519 ov_out_of_sync_print(device);
4520 drbd_resync_finished(device);
4521 return 0;
4524 /* peer says his disk is inconsistent, while we think it is uptodate,
4525 * and this happens while the peer still thinks we have a sync going on,
4526 * but we think we are already done with the sync.
4527 * We ignore this to avoid flapping pdsk.
4528 * This should not happen, if the peer is a recent version of drbd. */
4529 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4530 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4531 real_peer_disk = D_UP_TO_DATE;
4533 if (ns.conn == C_WF_REPORT_PARAMS)
4534 ns.conn = C_CONNECTED;
4536 if (peer_state.conn == C_AHEAD)
4537 ns.conn = C_BEHIND;
4539 /* TODO:
4540 * if (primary and diskless and peer uuid != effective uuid)
4541 * abort attach on peer;
4543 * If this node does not have good data, was already connected, but
4544 * the peer did a late attach only now, trying to "negotiate" with me,
4545 * AND I am currently Primary, possibly frozen, with some specific
4546 * "effective" uuid, this should never be reached, really, because
4547 * we first send the uuids, then the current state.
4549 * In this scenario, we already dropped the connection hard
4550 * when we received the unsuitable uuids (receive_uuids().
4552 * Should we want to change this, that is: not drop the connection in
4553 * receive_uuids() already, then we would need to add a branch here
4554 * that aborts the attach of "unsuitable uuids" on the peer in case
4555 * this node is currently Diskless Primary.
4558 if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4559 get_ldev_if_state(device, D_NEGOTIATING)) {
4560 int cr; /* consider resync */
4562 /* if we established a new connection */
4563 cr = (os.conn < C_CONNECTED);
4564 /* if we had an established connection
4565 * and one of the nodes newly attaches a disk */
4566 cr |= (os.conn == C_CONNECTED &&
4567 (peer_state.disk == D_NEGOTIATING ||
4568 os.disk == D_NEGOTIATING));
4569 /* if we have both been inconsistent, and the peer has been
4570 * forced to be UpToDate with --force */
4571 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4572 /* if we had been plain connected, and the admin requested to
4573 * start a sync by "invalidate" or "invalidate-remote" */
4574 cr |= (os.conn == C_CONNECTED &&
4575 (peer_state.conn >= C_STARTING_SYNC_S &&
4576 peer_state.conn <= C_WF_BITMAP_T));
4578 if (cr)
4579 ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4581 put_ldev(device);
4582 if (ns.conn == C_MASK) {
4583 ns.conn = C_CONNECTED;
4584 if (device->state.disk == D_NEGOTIATING) {
4585 drbd_force_state(device, NS(disk, D_FAILED));
4586 } else if (peer_state.disk == D_NEGOTIATING) {
4587 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4588 peer_state.disk = D_DISKLESS;
4589 real_peer_disk = D_DISKLESS;
4590 } else {
4591 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4592 return -EIO;
4593 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4594 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4595 return -EIO;
4600 spin_lock_irq(&device->resource->req_lock);
4601 if (os.i != drbd_read_state(device).i)
4602 goto retry;
4603 clear_bit(CONSIDER_RESYNC, &device->flags);
4604 ns.peer = peer_state.role;
4605 ns.pdsk = real_peer_disk;
4606 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4607 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4608 ns.disk = device->new_state_tmp.disk;
4609 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4610 if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4611 test_bit(NEW_CUR_UUID, &device->flags)) {
4612 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4613 for temporal network outages! */
4614 spin_unlock_irq(&device->resource->req_lock);
4615 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4616 tl_clear(peer_device->connection);
4617 drbd_uuid_new_current(device);
4618 clear_bit(NEW_CUR_UUID, &device->flags);
4619 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4620 return -EIO;
4622 rv = _drbd_set_state(device, ns, cs_flags, NULL);
4623 ns = drbd_read_state(device);
4624 spin_unlock_irq(&device->resource->req_lock);
4626 if (rv < SS_SUCCESS) {
4627 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4628 return -EIO;
4631 if (os.conn > C_WF_REPORT_PARAMS) {
4632 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4633 peer_state.disk != D_NEGOTIATING ) {
4634 /* we want resync, peer has not yet decided to sync... */
4635 /* Nowadays only used when forcing a node into primary role and
4636 setting its disk to UpToDate with that */
4637 drbd_send_uuids(peer_device);
4638 drbd_send_current_state(peer_device);
4642 clear_bit(DISCARD_MY_DATA, &device->flags);
4644 drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4646 return 0;
4649 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4651 struct drbd_peer_device *peer_device;
4652 struct drbd_device *device;
4653 struct p_rs_uuid *p = pi->data;
4655 peer_device = conn_peer_device(connection, pi->vnr);
4656 if (!peer_device)
4657 return -EIO;
4658 device = peer_device->device;
4660 wait_event(device->misc_wait,
4661 device->state.conn == C_WF_SYNC_UUID ||
4662 device->state.conn == C_BEHIND ||
4663 device->state.conn < C_CONNECTED ||
4664 device->state.disk < D_NEGOTIATING);
4666 /* D_ASSERT(device, device->state.conn == C_WF_SYNC_UUID ); */
4668 /* Here the _drbd_uuid_ functions are right, current should
4669 _not_ be rotated into the history */
4670 if (get_ldev_if_state(device, D_NEGOTIATING)) {
4671 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4672 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4674 drbd_print_uuids(device, "updated sync uuid");
4675 drbd_start_resync(device, C_SYNC_TARGET);
4677 put_ldev(device);
4678 } else
4679 drbd_err(device, "Ignoring SyncUUID packet!\n");
4681 return 0;
4685 * receive_bitmap_plain
4687 * Return 0 when done, 1 when another iteration is needed, and a negative error
4688 * code upon failure.
4690 static int
4691 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4692 unsigned long *p, struct bm_xfer_ctx *c)
4694 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4695 drbd_header_size(peer_device->connection);
4696 unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4697 c->bm_words - c->word_offset);
4698 unsigned int want = num_words * sizeof(*p);
4699 int err;
4701 if (want != size) {
4702 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4703 return -EIO;
4705 if (want == 0)
4706 return 0;
4707 err = drbd_recv_all(peer_device->connection, p, want);
4708 if (err)
4709 return err;
4711 drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4713 c->word_offset += num_words;
4714 c->bit_offset = c->word_offset * BITS_PER_LONG;
4715 if (c->bit_offset > c->bm_bits)
4716 c->bit_offset = c->bm_bits;
4718 return 1;
4721 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4723 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4726 static int dcbp_get_start(struct p_compressed_bm *p)
4728 return (p->encoding & 0x80) != 0;
4731 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4733 return (p->encoding >> 4) & 0x7;
4737 * recv_bm_rle_bits
4739 * Return 0 when done, 1 when another iteration is needed, and a negative error
4740 * code upon failure.
4742 static int
4743 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4744 struct p_compressed_bm *p,
4745 struct bm_xfer_ctx *c,
4746 unsigned int len)
4748 struct bitstream bs;
4749 u64 look_ahead;
4750 u64 rl;
4751 u64 tmp;
4752 unsigned long s = c->bit_offset;
4753 unsigned long e;
4754 int toggle = dcbp_get_start(p);
4755 int have;
4756 int bits;
4758 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4760 bits = bitstream_get_bits(&bs, &look_ahead, 64);
4761 if (bits < 0)
4762 return -EIO;
4764 for (have = bits; have > 0; s += rl, toggle = !toggle) {
4765 bits = vli_decode_bits(&rl, look_ahead);
4766 if (bits <= 0)
4767 return -EIO;
4769 if (toggle) {
4770 e = s + rl -1;
4771 if (e >= c->bm_bits) {
4772 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4773 return -EIO;
4775 _drbd_bm_set_bits(peer_device->device, s, e);
4778 if (have < bits) {
4779 drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4780 have, bits, look_ahead,
4781 (unsigned int)(bs.cur.b - p->code),
4782 (unsigned int)bs.buf_len);
4783 return -EIO;
4785 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4786 if (likely(bits < 64))
4787 look_ahead >>= bits;
4788 else
4789 look_ahead = 0;
4790 have -= bits;
4792 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4793 if (bits < 0)
4794 return -EIO;
4795 look_ahead |= tmp << have;
4796 have += bits;
4799 c->bit_offset = s;
4800 bm_xfer_ctx_bit_to_word_offset(c);
4802 return (s != c->bm_bits);
4806 * decode_bitmap_c
4808 * Return 0 when done, 1 when another iteration is needed, and a negative error
4809 * code upon failure.
4811 static int
4812 decode_bitmap_c(struct drbd_peer_device *peer_device,
4813 struct p_compressed_bm *p,
4814 struct bm_xfer_ctx *c,
4815 unsigned int len)
4817 if (dcbp_get_code(p) == RLE_VLI_Bits)
4818 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4820 /* other variants had been implemented for evaluation,
4821 * but have been dropped as this one turned out to be "best"
4822 * during all our tests. */
4824 drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4825 conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4826 return -EIO;
4829 void INFO_bm_xfer_stats(struct drbd_device *device,
4830 const char *direction, struct bm_xfer_ctx *c)
4832 /* what would it take to transfer it "plaintext" */
4833 unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4834 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4835 unsigned int plain =
4836 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4837 c->bm_words * sizeof(unsigned long);
4838 unsigned int total = c->bytes[0] + c->bytes[1];
4839 unsigned int r;
4841 /* total can not be zero. but just in case: */
4842 if (total == 0)
4843 return;
4845 /* don't report if not compressed */
4846 if (total >= plain)
4847 return;
4849 /* total < plain. check for overflow, still */
4850 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4851 : (1000 * total / plain);
4853 if (r > 1000)
4854 r = 1000;
4856 r = 1000 - r;
4857 drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4858 "total %u; compression: %u.%u%%\n",
4859 direction,
4860 c->bytes[1], c->packets[1],
4861 c->bytes[0], c->packets[0],
4862 total, r/10, r % 10);
4865 /* Since we are processing the bitfield from lower addresses to higher,
4866 it does not matter if the process it in 32 bit chunks or 64 bit
4867 chunks as long as it is little endian. (Understand it as byte stream,
4868 beginning with the lowest byte...) If we would use big endian
4869 we would need to process it from the highest address to the lowest,
4870 in order to be agnostic to the 32 vs 64 bits issue.
4872 returns 0 on failure, 1 if we successfully received it. */
4873 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4875 struct drbd_peer_device *peer_device;
4876 struct drbd_device *device;
4877 struct bm_xfer_ctx c;
4878 int err;
4880 peer_device = conn_peer_device(connection, pi->vnr);
4881 if (!peer_device)
4882 return -EIO;
4883 device = peer_device->device;
4885 drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4886 /* you are supposed to send additional out-of-sync information
4887 * if you actually set bits during this phase */
4889 c = (struct bm_xfer_ctx) {
4890 .bm_bits = drbd_bm_bits(device),
4891 .bm_words = drbd_bm_words(device),
4894 for(;;) {
4895 if (pi->cmd == P_BITMAP)
4896 err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4897 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4898 /* MAYBE: sanity check that we speak proto >= 90,
4899 * and the feature is enabled! */
4900 struct p_compressed_bm *p = pi->data;
4902 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4903 drbd_err(device, "ReportCBitmap packet too large\n");
4904 err = -EIO;
4905 goto out;
4907 if (pi->size <= sizeof(*p)) {
4908 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4909 err = -EIO;
4910 goto out;
4912 err = drbd_recv_all(peer_device->connection, p, pi->size);
4913 if (err)
4914 goto out;
4915 err = decode_bitmap_c(peer_device, p, &c, pi->size);
4916 } else {
4917 drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4918 err = -EIO;
4919 goto out;
4922 c.packets[pi->cmd == P_BITMAP]++;
4923 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4925 if (err <= 0) {
4926 if (err < 0)
4927 goto out;
4928 break;
4930 err = drbd_recv_header(peer_device->connection, pi);
4931 if (err)
4932 goto out;
4935 INFO_bm_xfer_stats(device, "receive", &c);
4937 if (device->state.conn == C_WF_BITMAP_T) {
4938 enum drbd_state_rv rv;
4940 err = drbd_send_bitmap(device);
4941 if (err)
4942 goto out;
4943 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4944 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4945 D_ASSERT(device, rv == SS_SUCCESS);
4946 } else if (device->state.conn != C_WF_BITMAP_S) {
4947 /* admin may have requested C_DISCONNECTING,
4948 * other threads may have noticed network errors */
4949 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4950 drbd_conn_str(device->state.conn));
4952 err = 0;
4954 out:
4955 drbd_bm_unlock(device);
4956 if (!err && device->state.conn == C_WF_BITMAP_S)
4957 drbd_start_resync(device, C_SYNC_SOURCE);
4958 return err;
4961 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4963 drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4964 pi->cmd, pi->size);
4966 return ignore_remaining_packet(connection, pi);
4969 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4971 /* Make sure we've acked all the TCP data associated
4972 * with the data requests being unplugged */
4973 drbd_tcp_quickack(connection->data.socket);
4975 return 0;
4978 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4980 struct drbd_peer_device *peer_device;
4981 struct drbd_device *device;
4982 struct p_block_desc *p = pi->data;
4984 peer_device = conn_peer_device(connection, pi->vnr);
4985 if (!peer_device)
4986 return -EIO;
4987 device = peer_device->device;
4989 switch (device->state.conn) {
4990 case C_WF_SYNC_UUID:
4991 case C_WF_BITMAP_T:
4992 case C_BEHIND:
4993 break;
4994 default:
4995 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4996 drbd_conn_str(device->state.conn));
4999 drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
5001 return 0;
5004 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
5006 struct drbd_peer_device *peer_device;
5007 struct p_block_desc *p = pi->data;
5008 struct drbd_device *device;
5009 sector_t sector;
5010 int size, err = 0;
5012 peer_device = conn_peer_device(connection, pi->vnr);
5013 if (!peer_device)
5014 return -EIO;
5015 device = peer_device->device;
5017 sector = be64_to_cpu(p->sector);
5018 size = be32_to_cpu(p->blksize);
5020 dec_rs_pending(device);
5022 if (get_ldev(device)) {
5023 struct drbd_peer_request *peer_req;
5024 const int op = REQ_OP_WRITE_ZEROES;
5026 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
5027 size, 0, GFP_NOIO);
5028 if (!peer_req) {
5029 put_ldev(device);
5030 return -ENOMEM;
5033 peer_req->w.cb = e_end_resync_block;
5034 peer_req->submit_jif = jiffies;
5035 peer_req->flags |= EE_TRIM;
5037 spin_lock_irq(&device->resource->req_lock);
5038 list_add_tail(&peer_req->w.list, &device->sync_ee);
5039 spin_unlock_irq(&device->resource->req_lock);
5041 atomic_add(pi->size >> 9, &device->rs_sect_ev);
5042 err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
5044 if (err) {
5045 spin_lock_irq(&device->resource->req_lock);
5046 list_del(&peer_req->w.list);
5047 spin_unlock_irq(&device->resource->req_lock);
5049 drbd_free_peer_req(device, peer_req);
5050 put_ldev(device);
5051 err = 0;
5052 goto fail;
5055 inc_unacked(device);
5057 /* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
5058 as well as drbd_rs_complete_io() */
5059 } else {
5060 fail:
5061 drbd_rs_complete_io(device, sector);
5062 drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
5065 atomic_add(size >> 9, &device->rs_sect_in);
5067 return err;
5070 struct data_cmd {
5071 int expect_payload;
5072 unsigned int pkt_size;
5073 int (*fn)(struct drbd_connection *, struct packet_info *);
5076 static struct data_cmd drbd_cmd_handler[] = {
5077 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
5078 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
5079 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
5080 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
5081 [P_BITMAP] = { 1, 0, receive_bitmap } ,
5082 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
5083 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote },
5084 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
5085 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
5086 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam },
5087 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam },
5088 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
5089 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
5090 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
5091 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
5092 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
5093 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
5094 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
5095 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
5096 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
5097 [P_RS_THIN_REQ] = { 0, sizeof(struct p_block_req), receive_DataRequest },
5098 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
5099 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
5100 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
5101 [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
5102 [P_TRIM] = { 0, sizeof(struct p_trim), receive_Data },
5103 [P_ZEROES] = { 0, sizeof(struct p_trim), receive_Data },
5104 [P_RS_DEALLOCATED] = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
5105 [P_WSAME] = { 1, sizeof(struct p_wsame), receive_Data },
5108 static void drbdd(struct drbd_connection *connection)
5110 struct packet_info pi;
5111 size_t shs; /* sub header size */
5112 int err;
5114 while (get_t_state(&connection->receiver) == RUNNING) {
5115 struct data_cmd const *cmd;
5117 drbd_thread_current_set_cpu(&connection->receiver);
5118 update_receiver_timing_details(connection, drbd_recv_header_maybe_unplug);
5119 if (drbd_recv_header_maybe_unplug(connection, &pi))
5120 goto err_out;
5122 cmd = &drbd_cmd_handler[pi.cmd];
5123 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
5124 drbd_err(connection, "Unexpected data packet %s (0x%04x)",
5125 cmdname(pi.cmd), pi.cmd);
5126 goto err_out;
5129 shs = cmd->pkt_size;
5130 if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
5131 shs += sizeof(struct o_qlim);
5132 if (pi.size > shs && !cmd->expect_payload) {
5133 drbd_err(connection, "No payload expected %s l:%d\n",
5134 cmdname(pi.cmd), pi.size);
5135 goto err_out;
5137 if (pi.size < shs) {
5138 drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
5139 cmdname(pi.cmd), (int)shs, pi.size);
5140 goto err_out;
5143 if (shs) {
5144 update_receiver_timing_details(connection, drbd_recv_all_warn);
5145 err = drbd_recv_all_warn(connection, pi.data, shs);
5146 if (err)
5147 goto err_out;
5148 pi.size -= shs;
5151 update_receiver_timing_details(connection, cmd->fn);
5152 err = cmd->fn(connection, &pi);
5153 if (err) {
5154 drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
5155 cmdname(pi.cmd), err, pi.size);
5156 goto err_out;
5159 return;
5161 err_out:
5162 conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
5165 static void conn_disconnect(struct drbd_connection *connection)
5167 struct drbd_peer_device *peer_device;
5168 enum drbd_conns oc;
5169 int vnr;
5171 if (connection->cstate == C_STANDALONE)
5172 return;
5174 /* We are about to start the cleanup after connection loss.
5175 * Make sure drbd_make_request knows about that.
5176 * Usually we should be in some network failure state already,
5177 * but just in case we are not, we fix it up here.
5179 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5181 /* ack_receiver does not clean up anything. it must not interfere, either */
5182 drbd_thread_stop(&connection->ack_receiver);
5183 if (connection->ack_sender) {
5184 destroy_workqueue(connection->ack_sender);
5185 connection->ack_sender = NULL;
5187 drbd_free_sock(connection);
5189 rcu_read_lock();
5190 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5191 struct drbd_device *device = peer_device->device;
5192 kref_get(&device->kref);
5193 rcu_read_unlock();
5194 drbd_disconnected(peer_device);
5195 kref_put(&device->kref, drbd_destroy_device);
5196 rcu_read_lock();
5198 rcu_read_unlock();
5200 if (!list_empty(&connection->current_epoch->list))
5201 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
5202 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
5203 atomic_set(&connection->current_epoch->epoch_size, 0);
5204 connection->send.seen_any_write_yet = false;
5206 drbd_info(connection, "Connection closed\n");
5208 if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
5209 conn_try_outdate_peer_async(connection);
5211 spin_lock_irq(&connection->resource->req_lock);
5212 oc = connection->cstate;
5213 if (oc >= C_UNCONNECTED)
5214 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
5216 spin_unlock_irq(&connection->resource->req_lock);
5218 if (oc == C_DISCONNECTING)
5219 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
5222 static int drbd_disconnected(struct drbd_peer_device *peer_device)
5224 struct drbd_device *device = peer_device->device;
5225 unsigned int i;
5227 /* wait for current activity to cease. */
5228 spin_lock_irq(&device->resource->req_lock);
5229 _drbd_wait_ee_list_empty(device, &device->active_ee);
5230 _drbd_wait_ee_list_empty(device, &device->sync_ee);
5231 _drbd_wait_ee_list_empty(device, &device->read_ee);
5232 spin_unlock_irq(&device->resource->req_lock);
5234 /* We do not have data structures that would allow us to
5235 * get the rs_pending_cnt down to 0 again.
5236 * * On C_SYNC_TARGET we do not have any data structures describing
5237 * the pending RSDataRequest's we have sent.
5238 * * On C_SYNC_SOURCE there is no data structure that tracks
5239 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5240 * And no, it is not the sum of the reference counts in the
5241 * resync_LRU. The resync_LRU tracks the whole operation including
5242 * the disk-IO, while the rs_pending_cnt only tracks the blocks
5243 * on the fly. */
5244 drbd_rs_cancel_all(device);
5245 device->rs_total = 0;
5246 device->rs_failed = 0;
5247 atomic_set(&device->rs_pending_cnt, 0);
5248 wake_up(&device->misc_wait);
5250 del_timer_sync(&device->resync_timer);
5251 resync_timer_fn(&device->resync_timer);
5253 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5254 * w_make_resync_request etc. which may still be on the worker queue
5255 * to be "canceled" */
5256 drbd_flush_workqueue(&peer_device->connection->sender_work);
5258 drbd_finish_peer_reqs(device);
5260 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5261 might have issued a work again. The one before drbd_finish_peer_reqs() is
5262 necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5263 drbd_flush_workqueue(&peer_device->connection->sender_work);
5265 /* need to do it again, drbd_finish_peer_reqs() may have populated it
5266 * again via drbd_try_clear_on_disk_bm(). */
5267 drbd_rs_cancel_all(device);
5269 kfree(device->p_uuid);
5270 device->p_uuid = NULL;
5272 if (!drbd_suspended(device))
5273 tl_clear(peer_device->connection);
5275 drbd_md_sync(device);
5277 if (get_ldev(device)) {
5278 drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5279 "write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5280 put_ldev(device);
5283 /* tcp_close and release of sendpage pages can be deferred. I don't
5284 * want to use SO_LINGER, because apparently it can be deferred for
5285 * more than 20 seconds (longest time I checked).
5287 * Actually we don't care for exactly when the network stack does its
5288 * put_page(), but release our reference on these pages right here.
5290 i = drbd_free_peer_reqs(device, &device->net_ee);
5291 if (i)
5292 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5293 i = atomic_read(&device->pp_in_use_by_net);
5294 if (i)
5295 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5296 i = atomic_read(&device->pp_in_use);
5297 if (i)
5298 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5300 D_ASSERT(device, list_empty(&device->read_ee));
5301 D_ASSERT(device, list_empty(&device->active_ee));
5302 D_ASSERT(device, list_empty(&device->sync_ee));
5303 D_ASSERT(device, list_empty(&device->done_ee));
5305 return 0;
5309 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5310 * we can agree on is stored in agreed_pro_version.
5312 * feature flags and the reserved array should be enough room for future
5313 * enhancements of the handshake protocol, and possible plugins...
5315 * for now, they are expected to be zero, but ignored.
5317 static int drbd_send_features(struct drbd_connection *connection)
5319 struct drbd_socket *sock;
5320 struct p_connection_features *p;
5322 sock = &connection->data;
5323 p = conn_prepare_command(connection, sock);
5324 if (!p)
5325 return -EIO;
5326 memset(p, 0, sizeof(*p));
5327 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5328 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5329 p->feature_flags = cpu_to_be32(PRO_FEATURES);
5330 return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5334 * return values:
5335 * 1 yes, we have a valid connection
5336 * 0 oops, did not work out, please try again
5337 * -1 peer talks different language,
5338 * no point in trying again, please go standalone.
5340 static int drbd_do_features(struct drbd_connection *connection)
5342 /* ASSERT current == connection->receiver ... */
5343 struct p_connection_features *p;
5344 const int expect = sizeof(struct p_connection_features);
5345 struct packet_info pi;
5346 int err;
5348 err = drbd_send_features(connection);
5349 if (err)
5350 return 0;
5352 err = drbd_recv_header(connection, &pi);
5353 if (err)
5354 return 0;
5356 if (pi.cmd != P_CONNECTION_FEATURES) {
5357 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5358 cmdname(pi.cmd), pi.cmd);
5359 return -1;
5362 if (pi.size != expect) {
5363 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5364 expect, pi.size);
5365 return -1;
5368 p = pi.data;
5369 err = drbd_recv_all_warn(connection, p, expect);
5370 if (err)
5371 return 0;
5373 p->protocol_min = be32_to_cpu(p->protocol_min);
5374 p->protocol_max = be32_to_cpu(p->protocol_max);
5375 if (p->protocol_max == 0)
5376 p->protocol_max = p->protocol_min;
5378 if (PRO_VERSION_MAX < p->protocol_min ||
5379 PRO_VERSION_MIN > p->protocol_max)
5380 goto incompat;
5382 connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5383 connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5385 drbd_info(connection, "Handshake successful: "
5386 "Agreed network protocol version %d\n", connection->agreed_pro_version);
5388 drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s%s.\n",
5389 connection->agreed_features,
5390 connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5391 connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5392 connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" : "",
5393 connection->agreed_features & DRBD_FF_WZEROES ? " WRITE_ZEROES" :
5394 connection->agreed_features ? "" : " none");
5396 return 1;
5398 incompat:
5399 drbd_err(connection, "incompatible DRBD dialects: "
5400 "I support %d-%d, peer supports %d-%d\n",
5401 PRO_VERSION_MIN, PRO_VERSION_MAX,
5402 p->protocol_min, p->protocol_max);
5403 return -1;
5406 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5407 static int drbd_do_auth(struct drbd_connection *connection)
5409 drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5410 drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5411 return -1;
5413 #else
5414 #define CHALLENGE_LEN 64
5416 /* Return value:
5417 1 - auth succeeded,
5418 0 - failed, try again (network error),
5419 -1 - auth failed, don't try again.
5422 static int drbd_do_auth(struct drbd_connection *connection)
5424 struct drbd_socket *sock;
5425 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
5426 char *response = NULL;
5427 char *right_response = NULL;
5428 char *peers_ch = NULL;
5429 unsigned int key_len;
5430 char secret[SHARED_SECRET_MAX]; /* 64 byte */
5431 unsigned int resp_size;
5432 SHASH_DESC_ON_STACK(desc, connection->cram_hmac_tfm);
5433 struct packet_info pi;
5434 struct net_conf *nc;
5435 int err, rv;
5437 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
5439 rcu_read_lock();
5440 nc = rcu_dereference(connection->net_conf);
5441 key_len = strlen(nc->shared_secret);
5442 memcpy(secret, nc->shared_secret, key_len);
5443 rcu_read_unlock();
5445 desc->tfm = connection->cram_hmac_tfm;
5446 desc->flags = 0;
5448 rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5449 if (rv) {
5450 drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5451 rv = -1;
5452 goto fail;
5455 get_random_bytes(my_challenge, CHALLENGE_LEN);
5457 sock = &connection->data;
5458 if (!conn_prepare_command(connection, sock)) {
5459 rv = 0;
5460 goto fail;
5462 rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5463 my_challenge, CHALLENGE_LEN);
5464 if (!rv)
5465 goto fail;
5467 err = drbd_recv_header(connection, &pi);
5468 if (err) {
5469 rv = 0;
5470 goto fail;
5473 if (pi.cmd != P_AUTH_CHALLENGE) {
5474 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5475 cmdname(pi.cmd), pi.cmd);
5476 rv = -1;
5477 goto fail;
5480 if (pi.size > CHALLENGE_LEN * 2) {
5481 drbd_err(connection, "expected AuthChallenge payload too big.\n");
5482 rv = -1;
5483 goto fail;
5486 if (pi.size < CHALLENGE_LEN) {
5487 drbd_err(connection, "AuthChallenge payload too small.\n");
5488 rv = -1;
5489 goto fail;
5492 peers_ch = kmalloc(pi.size, GFP_NOIO);
5493 if (peers_ch == NULL) {
5494 drbd_err(connection, "kmalloc of peers_ch failed\n");
5495 rv = -1;
5496 goto fail;
5499 err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5500 if (err) {
5501 rv = 0;
5502 goto fail;
5505 if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5506 drbd_err(connection, "Peer presented the same challenge!\n");
5507 rv = -1;
5508 goto fail;
5511 resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5512 response = kmalloc(resp_size, GFP_NOIO);
5513 if (response == NULL) {
5514 drbd_err(connection, "kmalloc of response failed\n");
5515 rv = -1;
5516 goto fail;
5519 rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5520 if (rv) {
5521 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5522 rv = -1;
5523 goto fail;
5526 if (!conn_prepare_command(connection, sock)) {
5527 rv = 0;
5528 goto fail;
5530 rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5531 response, resp_size);
5532 if (!rv)
5533 goto fail;
5535 err = drbd_recv_header(connection, &pi);
5536 if (err) {
5537 rv = 0;
5538 goto fail;
5541 if (pi.cmd != P_AUTH_RESPONSE) {
5542 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5543 cmdname(pi.cmd), pi.cmd);
5544 rv = 0;
5545 goto fail;
5548 if (pi.size != resp_size) {
5549 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5550 rv = 0;
5551 goto fail;
5554 err = drbd_recv_all_warn(connection, response , resp_size);
5555 if (err) {
5556 rv = 0;
5557 goto fail;
5560 right_response = kmalloc(resp_size, GFP_NOIO);
5561 if (right_response == NULL) {
5562 drbd_err(connection, "kmalloc of right_response failed\n");
5563 rv = -1;
5564 goto fail;
5567 rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5568 right_response);
5569 if (rv) {
5570 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5571 rv = -1;
5572 goto fail;
5575 rv = !memcmp(response, right_response, resp_size);
5577 if (rv)
5578 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5579 resp_size);
5580 else
5581 rv = -1;
5583 fail:
5584 kfree(peers_ch);
5585 kfree(response);
5586 kfree(right_response);
5587 shash_desc_zero(desc);
5589 return rv;
5591 #endif
5593 int drbd_receiver(struct drbd_thread *thi)
5595 struct drbd_connection *connection = thi->connection;
5596 int h;
5598 drbd_info(connection, "receiver (re)started\n");
5600 do {
5601 h = conn_connect(connection);
5602 if (h == 0) {
5603 conn_disconnect(connection);
5604 schedule_timeout_interruptible(HZ);
5606 if (h == -1) {
5607 drbd_warn(connection, "Discarding network configuration.\n");
5608 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5610 } while (h == 0);
5612 if (h > 0) {
5613 blk_start_plug(&connection->receiver_plug);
5614 drbdd(connection);
5615 blk_finish_plug(&connection->receiver_plug);
5618 conn_disconnect(connection);
5620 drbd_info(connection, "receiver terminated\n");
5621 return 0;
5624 /* ********* acknowledge sender ******** */
5626 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5628 struct p_req_state_reply *p = pi->data;
5629 int retcode = be32_to_cpu(p->retcode);
5631 if (retcode >= SS_SUCCESS) {
5632 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5633 } else {
5634 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5635 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5636 drbd_set_st_err_str(retcode), retcode);
5638 wake_up(&connection->ping_wait);
5640 return 0;
5643 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5645 struct drbd_peer_device *peer_device;
5646 struct drbd_device *device;
5647 struct p_req_state_reply *p = pi->data;
5648 int retcode = be32_to_cpu(p->retcode);
5650 peer_device = conn_peer_device(connection, pi->vnr);
5651 if (!peer_device)
5652 return -EIO;
5653 device = peer_device->device;
5655 if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5656 D_ASSERT(device, connection->agreed_pro_version < 100);
5657 return got_conn_RqSReply(connection, pi);
5660 if (retcode >= SS_SUCCESS) {
5661 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5662 } else {
5663 set_bit(CL_ST_CHG_FAIL, &device->flags);
5664 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5665 drbd_set_st_err_str(retcode), retcode);
5667 wake_up(&device->state_wait);
5669 return 0;
5672 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5674 return drbd_send_ping_ack(connection);
5678 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5680 /* restore idle timeout */
5681 connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5682 if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5683 wake_up(&connection->ping_wait);
5685 return 0;
5688 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5690 struct drbd_peer_device *peer_device;
5691 struct drbd_device *device;
5692 struct p_block_ack *p = pi->data;
5693 sector_t sector = be64_to_cpu(p->sector);
5694 int blksize = be32_to_cpu(p->blksize);
5696 peer_device = conn_peer_device(connection, pi->vnr);
5697 if (!peer_device)
5698 return -EIO;
5699 device = peer_device->device;
5701 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5703 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5705 if (get_ldev(device)) {
5706 drbd_rs_complete_io(device, sector);
5707 drbd_set_in_sync(device, sector, blksize);
5708 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5709 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5710 put_ldev(device);
5712 dec_rs_pending(device);
5713 atomic_add(blksize >> 9, &device->rs_sect_in);
5715 return 0;
5718 static int
5719 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5720 struct rb_root *root, const char *func,
5721 enum drbd_req_event what, bool missing_ok)
5723 struct drbd_request *req;
5724 struct bio_and_error m;
5726 spin_lock_irq(&device->resource->req_lock);
5727 req = find_request(device, root, id, sector, missing_ok, func);
5728 if (unlikely(!req)) {
5729 spin_unlock_irq(&device->resource->req_lock);
5730 return -EIO;
5732 __req_mod(req, what, &m);
5733 spin_unlock_irq(&device->resource->req_lock);
5735 if (m.bio)
5736 complete_master_bio(device, &m);
5737 return 0;
5740 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5742 struct drbd_peer_device *peer_device;
5743 struct drbd_device *device;
5744 struct p_block_ack *p = pi->data;
5745 sector_t sector = be64_to_cpu(p->sector);
5746 int blksize = be32_to_cpu(p->blksize);
5747 enum drbd_req_event what;
5749 peer_device = conn_peer_device(connection, pi->vnr);
5750 if (!peer_device)
5751 return -EIO;
5752 device = peer_device->device;
5754 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5756 if (p->block_id == ID_SYNCER) {
5757 drbd_set_in_sync(device, sector, blksize);
5758 dec_rs_pending(device);
5759 return 0;
5761 switch (pi->cmd) {
5762 case P_RS_WRITE_ACK:
5763 what = WRITE_ACKED_BY_PEER_AND_SIS;
5764 break;
5765 case P_WRITE_ACK:
5766 what = WRITE_ACKED_BY_PEER;
5767 break;
5768 case P_RECV_ACK:
5769 what = RECV_ACKED_BY_PEER;
5770 break;
5771 case P_SUPERSEDED:
5772 what = CONFLICT_RESOLVED;
5773 break;
5774 case P_RETRY_WRITE:
5775 what = POSTPONE_WRITE;
5776 break;
5777 default:
5778 BUG();
5781 return validate_req_change_req_state(device, p->block_id, sector,
5782 &device->write_requests, __func__,
5783 what, false);
5786 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5788 struct drbd_peer_device *peer_device;
5789 struct drbd_device *device;
5790 struct p_block_ack *p = pi->data;
5791 sector_t sector = be64_to_cpu(p->sector);
5792 int size = be32_to_cpu(p->blksize);
5793 int err;
5795 peer_device = conn_peer_device(connection, pi->vnr);
5796 if (!peer_device)
5797 return -EIO;
5798 device = peer_device->device;
5800 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5802 if (p->block_id == ID_SYNCER) {
5803 dec_rs_pending(device);
5804 drbd_rs_failed_io(device, sector, size);
5805 return 0;
5808 err = validate_req_change_req_state(device, p->block_id, sector,
5809 &device->write_requests, __func__,
5810 NEG_ACKED, true);
5811 if (err) {
5812 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5813 The master bio might already be completed, therefore the
5814 request is no longer in the collision hash. */
5815 /* In Protocol B we might already have got a P_RECV_ACK
5816 but then get a P_NEG_ACK afterwards. */
5817 drbd_set_out_of_sync(device, sector, size);
5819 return 0;
5822 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5824 struct drbd_peer_device *peer_device;
5825 struct drbd_device *device;
5826 struct p_block_ack *p = pi->data;
5827 sector_t sector = be64_to_cpu(p->sector);
5829 peer_device = conn_peer_device(connection, pi->vnr);
5830 if (!peer_device)
5831 return -EIO;
5832 device = peer_device->device;
5834 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5836 drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5837 (unsigned long long)sector, be32_to_cpu(p->blksize));
5839 return validate_req_change_req_state(device, p->block_id, sector,
5840 &device->read_requests, __func__,
5841 NEG_ACKED, false);
5844 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5846 struct drbd_peer_device *peer_device;
5847 struct drbd_device *device;
5848 sector_t sector;
5849 int size;
5850 struct p_block_ack *p = pi->data;
5852 peer_device = conn_peer_device(connection, pi->vnr);
5853 if (!peer_device)
5854 return -EIO;
5855 device = peer_device->device;
5857 sector = be64_to_cpu(p->sector);
5858 size = be32_to_cpu(p->blksize);
5860 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5862 dec_rs_pending(device);
5864 if (get_ldev_if_state(device, D_FAILED)) {
5865 drbd_rs_complete_io(device, sector);
5866 switch (pi->cmd) {
5867 case P_NEG_RS_DREPLY:
5868 drbd_rs_failed_io(device, sector, size);
5869 case P_RS_CANCEL:
5870 break;
5871 default:
5872 BUG();
5874 put_ldev(device);
5877 return 0;
5880 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5882 struct p_barrier_ack *p = pi->data;
5883 struct drbd_peer_device *peer_device;
5884 int vnr;
5886 tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5888 rcu_read_lock();
5889 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5890 struct drbd_device *device = peer_device->device;
5892 if (device->state.conn == C_AHEAD &&
5893 atomic_read(&device->ap_in_flight) == 0 &&
5894 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5895 device->start_resync_timer.expires = jiffies + HZ;
5896 add_timer(&device->start_resync_timer);
5899 rcu_read_unlock();
5901 return 0;
5904 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5906 struct drbd_peer_device *peer_device;
5907 struct drbd_device *device;
5908 struct p_block_ack *p = pi->data;
5909 struct drbd_device_work *dw;
5910 sector_t sector;
5911 int size;
5913 peer_device = conn_peer_device(connection, pi->vnr);
5914 if (!peer_device)
5915 return -EIO;
5916 device = peer_device->device;
5918 sector = be64_to_cpu(p->sector);
5919 size = be32_to_cpu(p->blksize);
5921 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5923 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5924 drbd_ov_out_of_sync_found(device, sector, size);
5925 else
5926 ov_out_of_sync_print(device);
5928 if (!get_ldev(device))
5929 return 0;
5931 drbd_rs_complete_io(device, sector);
5932 dec_rs_pending(device);
5934 --device->ov_left;
5936 /* let's advance progress step marks only for every other megabyte */
5937 if ((device->ov_left & 0x200) == 0x200)
5938 drbd_advance_rs_marks(device, device->ov_left);
5940 if (device->ov_left == 0) {
5941 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5942 if (dw) {
5943 dw->w.cb = w_ov_finished;
5944 dw->device = device;
5945 drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5946 } else {
5947 drbd_err(device, "kmalloc(dw) failed.");
5948 ov_out_of_sync_print(device);
5949 drbd_resync_finished(device);
5952 put_ldev(device);
5953 return 0;
5956 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5958 return 0;
5961 struct meta_sock_cmd {
5962 size_t pkt_size;
5963 int (*fn)(struct drbd_connection *connection, struct packet_info *);
5966 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5968 long t;
5969 struct net_conf *nc;
5971 rcu_read_lock();
5972 nc = rcu_dereference(connection->net_conf);
5973 t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5974 rcu_read_unlock();
5976 t *= HZ;
5977 if (ping_timeout)
5978 t /= 10;
5980 connection->meta.socket->sk->sk_rcvtimeo = t;
5983 static void set_ping_timeout(struct drbd_connection *connection)
5985 set_rcvtimeo(connection, 1);
5988 static void set_idle_timeout(struct drbd_connection *connection)
5990 set_rcvtimeo(connection, 0);
5993 static struct meta_sock_cmd ack_receiver_tbl[] = {
5994 [P_PING] = { 0, got_Ping },
5995 [P_PING_ACK] = { 0, got_PingAck },
5996 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5997 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5998 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5999 [P_SUPERSEDED] = { sizeof(struct p_block_ack), got_BlockAck },
6000 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
6001 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
6002 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
6003 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
6004 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
6005 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
6006 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
6007 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
6008 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
6009 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
6010 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
6013 int drbd_ack_receiver(struct drbd_thread *thi)
6015 struct drbd_connection *connection = thi->connection;
6016 struct meta_sock_cmd *cmd = NULL;
6017 struct packet_info pi;
6018 unsigned long pre_recv_jif;
6019 int rv;
6020 void *buf = connection->meta.rbuf;
6021 int received = 0;
6022 unsigned int header_size = drbd_header_size(connection);
6023 int expect = header_size;
6024 bool ping_timeout_active = false;
6025 struct sched_param param = { .sched_priority = 2 };
6027 rv = sched_setscheduler(current, SCHED_RR, &param);
6028 if (rv < 0)
6029 drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
6031 while (get_t_state(thi) == RUNNING) {
6032 drbd_thread_current_set_cpu(thi);
6034 conn_reclaim_net_peer_reqs(connection);
6036 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
6037 if (drbd_send_ping(connection)) {
6038 drbd_err(connection, "drbd_send_ping has failed\n");
6039 goto reconnect;
6041 set_ping_timeout(connection);
6042 ping_timeout_active = true;
6045 pre_recv_jif = jiffies;
6046 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
6048 /* Note:
6049 * -EINTR (on meta) we got a signal
6050 * -EAGAIN (on meta) rcvtimeo expired
6051 * -ECONNRESET other side closed the connection
6052 * -ERESTARTSYS (on data) we got a signal
6053 * rv < 0 other than above: unexpected error!
6054 * rv == expected: full header or command
6055 * rv < expected: "woken" by signal during receive
6056 * rv == 0 : "connection shut down by peer"
6058 if (likely(rv > 0)) {
6059 received += rv;
6060 buf += rv;
6061 } else if (rv == 0) {
6062 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
6063 long t;
6064 rcu_read_lock();
6065 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
6066 rcu_read_unlock();
6068 t = wait_event_timeout(connection->ping_wait,
6069 connection->cstate < C_WF_REPORT_PARAMS,
6071 if (t)
6072 break;
6074 drbd_err(connection, "meta connection shut down by peer.\n");
6075 goto reconnect;
6076 } else if (rv == -EAGAIN) {
6077 /* If the data socket received something meanwhile,
6078 * that is good enough: peer is still alive. */
6079 if (time_after(connection->last_received, pre_recv_jif))
6080 continue;
6081 if (ping_timeout_active) {
6082 drbd_err(connection, "PingAck did not arrive in time.\n");
6083 goto reconnect;
6085 set_bit(SEND_PING, &connection->flags);
6086 continue;
6087 } else if (rv == -EINTR) {
6088 /* maybe drbd_thread_stop(): the while condition will notice.
6089 * maybe woken for send_ping: we'll send a ping above,
6090 * and change the rcvtimeo */
6091 flush_signals(current);
6092 continue;
6093 } else {
6094 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
6095 goto reconnect;
6098 if (received == expect && cmd == NULL) {
6099 if (decode_header(connection, connection->meta.rbuf, &pi))
6100 goto reconnect;
6101 cmd = &ack_receiver_tbl[pi.cmd];
6102 if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
6103 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
6104 cmdname(pi.cmd), pi.cmd);
6105 goto disconnect;
6107 expect = header_size + cmd->pkt_size;
6108 if (pi.size != expect - header_size) {
6109 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
6110 pi.cmd, pi.size);
6111 goto reconnect;
6114 if (received == expect) {
6115 bool err;
6117 err = cmd->fn(connection, &pi);
6118 if (err) {
6119 drbd_err(connection, "%pf failed\n", cmd->fn);
6120 goto reconnect;
6123 connection->last_received = jiffies;
6125 if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
6126 set_idle_timeout(connection);
6127 ping_timeout_active = false;
6130 buf = connection->meta.rbuf;
6131 received = 0;
6132 expect = header_size;
6133 cmd = NULL;
6137 if (0) {
6138 reconnect:
6139 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6140 conn_md_sync(connection);
6142 if (0) {
6143 disconnect:
6144 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
6147 drbd_info(connection, "ack_receiver terminated\n");
6149 return 0;
6152 void drbd_send_acks_wf(struct work_struct *ws)
6154 struct drbd_peer_device *peer_device =
6155 container_of(ws, struct drbd_peer_device, send_acks_work);
6156 struct drbd_connection *connection = peer_device->connection;
6157 struct drbd_device *device = peer_device->device;
6158 struct net_conf *nc;
6159 int tcp_cork, err;
6161 rcu_read_lock();
6162 nc = rcu_dereference(connection->net_conf);
6163 tcp_cork = nc->tcp_cork;
6164 rcu_read_unlock();
6166 if (tcp_cork)
6167 drbd_tcp_cork(connection->meta.socket);
6169 err = drbd_finish_peer_reqs(device);
6170 kref_put(&device->kref, drbd_destroy_device);
6171 /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
6172 struct work_struct send_acks_work alive, which is in the peer_device object */
6174 if (err) {
6175 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6176 return;
6179 if (tcp_cork)
6180 drbd_tcp_uncork(connection->meta.socket);
6182 return;