1 // SPDX-License-Identifier: GPL-2.0
2 #include <linux/ceph/ceph_debug.h>
4 #include <linux/crc32c.h>
5 #include <linux/ctype.h>
6 #include <linux/highmem.h>
7 #include <linux/inet.h>
8 #include <linux/kthread.h>
10 #include <linux/nsproxy.h>
11 #include <linux/sched/mm.h>
12 #include <linux/slab.h>
13 #include <linux/socket.h>
14 #include <linux/string.h>
16 #include <linux/bio.h>
17 #endif /* CONFIG_BLOCK */
18 #include <linux/dns_resolver.h>
21 #include <linux/ceph/ceph_features.h>
22 #include <linux/ceph/libceph.h>
23 #include <linux/ceph/messenger.h>
24 #include <linux/ceph/decode.h>
25 #include <linux/ceph/pagelist.h>
26 #include <linux/export.h>
29 * Ceph uses the messenger to exchange ceph_msg messages with other
30 * hosts in the system. The messenger provides ordered and reliable
31 * delivery. We tolerate TCP disconnects by reconnecting (with
32 * exponential backoff) in the case of a fault (disconnection, bad
33 * crc, protocol error). Acks allow sent messages to be discarded by
38 * We track the state of the socket on a given connection using
39 * values defined below. The transition to a new socket state is
40 * handled by a function which verifies we aren't coming from an
44 * | NEW* | transient initial state
46 * | con_sock_state_init()
49 * | CLOSED | initialized, but no socket (and no
50 * ---------- TCP connection)
52 * | \ con_sock_state_connecting()
53 * | ----------------------
55 * + con_sock_state_closed() \
56 * |+--------------------------- \
59 * | | CLOSING | socket event; \ \
60 * | ----------- await close \ \
63 * | + con_sock_state_closing() \ |
65 * | / --------------- | |
68 * | / -----------------| CONNECTING | socket created, TCP
69 * | | / -------------- connect initiated
70 * | | | con_sock_state_connected()
73 * | CONNECTED | TCP connection established
76 * State values for ceph_connection->sock_state; NEW is assumed to be 0.
79 #define CON_SOCK_STATE_NEW 0 /* -> CLOSED */
80 #define CON_SOCK_STATE_CLOSED 1 /* -> CONNECTING */
81 #define CON_SOCK_STATE_CONNECTING 2 /* -> CONNECTED or -> CLOSING */
82 #define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */
83 #define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */
85 static bool con_flag_valid(unsigned long con_flag
)
88 case CEPH_CON_F_LOSSYTX
:
89 case CEPH_CON_F_KEEPALIVE_PENDING
:
90 case CEPH_CON_F_WRITE_PENDING
:
91 case CEPH_CON_F_SOCK_CLOSED
:
92 case CEPH_CON_F_BACKOFF
:
99 void ceph_con_flag_clear(struct ceph_connection
*con
, unsigned long con_flag
)
101 BUG_ON(!con_flag_valid(con_flag
));
103 clear_bit(con_flag
, &con
->flags
);
106 void ceph_con_flag_set(struct ceph_connection
*con
, unsigned long con_flag
)
108 BUG_ON(!con_flag_valid(con_flag
));
110 set_bit(con_flag
, &con
->flags
);
113 bool ceph_con_flag_test(struct ceph_connection
*con
, unsigned long con_flag
)
115 BUG_ON(!con_flag_valid(con_flag
));
117 return test_bit(con_flag
, &con
->flags
);
120 bool ceph_con_flag_test_and_clear(struct ceph_connection
*con
,
121 unsigned long con_flag
)
123 BUG_ON(!con_flag_valid(con_flag
));
125 return test_and_clear_bit(con_flag
, &con
->flags
);
128 bool ceph_con_flag_test_and_set(struct ceph_connection
*con
,
129 unsigned long con_flag
)
131 BUG_ON(!con_flag_valid(con_flag
));
133 return test_and_set_bit(con_flag
, &con
->flags
);
136 /* Slab caches for frequently-allocated structures */
138 static struct kmem_cache
*ceph_msg_cache
;
140 #ifdef CONFIG_LOCKDEP
141 static struct lock_class_key socket_class
;
144 static void queue_con(struct ceph_connection
*con
);
145 static void cancel_con(struct ceph_connection
*con
);
146 static void ceph_con_workfn(struct work_struct
*);
147 static void con_fault(struct ceph_connection
*con
);
150 * Nicely render a sockaddr as a string. An array of formatted
151 * strings is used, to approximate reentrancy.
153 #define ADDR_STR_COUNT_LOG 5 /* log2(# address strings in array) */
154 #define ADDR_STR_COUNT (1 << ADDR_STR_COUNT_LOG)
155 #define ADDR_STR_COUNT_MASK (ADDR_STR_COUNT - 1)
156 #define MAX_ADDR_STR_LEN 64 /* 54 is enough */
158 static char addr_str
[ADDR_STR_COUNT
][MAX_ADDR_STR_LEN
];
159 static atomic_t addr_str_seq
= ATOMIC_INIT(0);
161 struct page
*ceph_zero_page
; /* used in certain error cases */
163 const char *ceph_pr_addr(const struct ceph_entity_addr
*addr
)
167 struct sockaddr_storage ss
= addr
->in_addr
; /* align */
168 struct sockaddr_in
*in4
= (struct sockaddr_in
*)&ss
;
169 struct sockaddr_in6
*in6
= (struct sockaddr_in6
*)&ss
;
171 i
= atomic_inc_return(&addr_str_seq
) & ADDR_STR_COUNT_MASK
;
174 switch (ss
.ss_family
) {
176 snprintf(s
, MAX_ADDR_STR_LEN
, "(%d)%pI4:%hu",
177 le32_to_cpu(addr
->type
), &in4
->sin_addr
,
178 ntohs(in4
->sin_port
));
182 snprintf(s
, MAX_ADDR_STR_LEN
, "(%d)[%pI6c]:%hu",
183 le32_to_cpu(addr
->type
), &in6
->sin6_addr
,
184 ntohs(in6
->sin6_port
));
188 snprintf(s
, MAX_ADDR_STR_LEN
, "(unknown sockaddr family %hu)",
194 EXPORT_SYMBOL(ceph_pr_addr
);
196 void ceph_encode_my_addr(struct ceph_messenger
*msgr
)
198 if (!ceph_msgr2(from_msgr(msgr
))) {
199 memcpy(&msgr
->my_enc_addr
, &msgr
->inst
.addr
,
200 sizeof(msgr
->my_enc_addr
));
201 ceph_encode_banner_addr(&msgr
->my_enc_addr
);
206 * work queue for all reading and writing to/from the socket.
208 static struct workqueue_struct
*ceph_msgr_wq
;
210 static int ceph_msgr_slab_init(void)
212 BUG_ON(ceph_msg_cache
);
213 ceph_msg_cache
= KMEM_CACHE(ceph_msg
, 0);
220 static void ceph_msgr_slab_exit(void)
222 BUG_ON(!ceph_msg_cache
);
223 kmem_cache_destroy(ceph_msg_cache
);
224 ceph_msg_cache
= NULL
;
227 static void _ceph_msgr_exit(void)
230 destroy_workqueue(ceph_msgr_wq
);
234 BUG_ON(!ceph_zero_page
);
235 put_page(ceph_zero_page
);
236 ceph_zero_page
= NULL
;
238 ceph_msgr_slab_exit();
241 int __init
ceph_msgr_init(void)
243 if (ceph_msgr_slab_init())
246 BUG_ON(ceph_zero_page
);
247 ceph_zero_page
= ZERO_PAGE(0);
248 get_page(ceph_zero_page
);
251 * The number of active work items is limited by the number of
252 * connections, so leave @max_active at default.
254 ceph_msgr_wq
= alloc_workqueue("ceph-msgr", WQ_MEM_RECLAIM
, 0);
258 pr_err("msgr_init failed to create workqueue\n");
264 void ceph_msgr_exit(void)
266 BUG_ON(ceph_msgr_wq
== NULL
);
271 void ceph_msgr_flush(void)
273 flush_workqueue(ceph_msgr_wq
);
275 EXPORT_SYMBOL(ceph_msgr_flush
);
277 /* Connection socket state transition functions */
279 static void con_sock_state_init(struct ceph_connection
*con
)
283 old_state
= atomic_xchg(&con
->sock_state
, CON_SOCK_STATE_CLOSED
);
284 if (WARN_ON(old_state
!= CON_SOCK_STATE_NEW
))
285 printk("%s: unexpected old state %d\n", __func__
, old_state
);
286 dout("%s con %p sock %d -> %d\n", __func__
, con
, old_state
,
287 CON_SOCK_STATE_CLOSED
);
290 static void con_sock_state_connecting(struct ceph_connection
*con
)
294 old_state
= atomic_xchg(&con
->sock_state
, CON_SOCK_STATE_CONNECTING
);
295 if (WARN_ON(old_state
!= CON_SOCK_STATE_CLOSED
))
296 printk("%s: unexpected old state %d\n", __func__
, old_state
);
297 dout("%s con %p sock %d -> %d\n", __func__
, con
, old_state
,
298 CON_SOCK_STATE_CONNECTING
);
301 static void con_sock_state_connected(struct ceph_connection
*con
)
305 old_state
= atomic_xchg(&con
->sock_state
, CON_SOCK_STATE_CONNECTED
);
306 if (WARN_ON(old_state
!= CON_SOCK_STATE_CONNECTING
))
307 printk("%s: unexpected old state %d\n", __func__
, old_state
);
308 dout("%s con %p sock %d -> %d\n", __func__
, con
, old_state
,
309 CON_SOCK_STATE_CONNECTED
);
312 static void con_sock_state_closing(struct ceph_connection
*con
)
316 old_state
= atomic_xchg(&con
->sock_state
, CON_SOCK_STATE_CLOSING
);
317 if (WARN_ON(old_state
!= CON_SOCK_STATE_CONNECTING
&&
318 old_state
!= CON_SOCK_STATE_CONNECTED
&&
319 old_state
!= CON_SOCK_STATE_CLOSING
))
320 printk("%s: unexpected old state %d\n", __func__
, old_state
);
321 dout("%s con %p sock %d -> %d\n", __func__
, con
, old_state
,
322 CON_SOCK_STATE_CLOSING
);
325 static void con_sock_state_closed(struct ceph_connection
*con
)
329 old_state
= atomic_xchg(&con
->sock_state
, CON_SOCK_STATE_CLOSED
);
330 if (WARN_ON(old_state
!= CON_SOCK_STATE_CONNECTED
&&
331 old_state
!= CON_SOCK_STATE_CLOSING
&&
332 old_state
!= CON_SOCK_STATE_CONNECTING
&&
333 old_state
!= CON_SOCK_STATE_CLOSED
))
334 printk("%s: unexpected old state %d\n", __func__
, old_state
);
335 dout("%s con %p sock %d -> %d\n", __func__
, con
, old_state
,
336 CON_SOCK_STATE_CLOSED
);
340 * socket callback functions
343 /* data available on socket, or listen socket received a connect */
344 static void ceph_sock_data_ready(struct sock
*sk
)
346 struct ceph_connection
*con
= sk
->sk_user_data
;
347 if (atomic_read(&con
->msgr
->stopping
)) {
351 if (sk
->sk_state
!= TCP_CLOSE_WAIT
) {
352 dout("%s %p state = %d, queueing work\n", __func__
,
358 /* socket has buffer space for writing */
359 static void ceph_sock_write_space(struct sock
*sk
)
361 struct ceph_connection
*con
= sk
->sk_user_data
;
363 /* only queue to workqueue if there is data we want to write,
364 * and there is sufficient space in the socket buffer to accept
365 * more data. clear SOCK_NOSPACE so that ceph_sock_write_space()
366 * doesn't get called again until try_write() fills the socket
367 * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
368 * and net/core/stream.c:sk_stream_write_space().
370 if (ceph_con_flag_test(con
, CEPH_CON_F_WRITE_PENDING
)) {
371 if (sk_stream_is_writeable(sk
)) {
372 dout("%s %p queueing write work\n", __func__
, con
);
373 clear_bit(SOCK_NOSPACE
, &sk
->sk_socket
->flags
);
377 dout("%s %p nothing to write\n", __func__
, con
);
381 /* socket's state has changed */
382 static void ceph_sock_state_change(struct sock
*sk
)
384 struct ceph_connection
*con
= sk
->sk_user_data
;
386 dout("%s %p state = %d sk_state = %u\n", __func__
,
387 con
, con
->state
, sk
->sk_state
);
389 switch (sk
->sk_state
) {
391 dout("%s TCP_CLOSE\n", __func__
);
394 dout("%s TCP_CLOSE_WAIT\n", __func__
);
395 con_sock_state_closing(con
);
396 ceph_con_flag_set(con
, CEPH_CON_F_SOCK_CLOSED
);
399 case TCP_ESTABLISHED
:
400 dout("%s TCP_ESTABLISHED\n", __func__
);
401 con_sock_state_connected(con
);
404 default: /* Everything else is uninteresting */
410 * set up socket callbacks
412 static void set_sock_callbacks(struct socket
*sock
,
413 struct ceph_connection
*con
)
415 struct sock
*sk
= sock
->sk
;
416 sk
->sk_user_data
= con
;
417 sk
->sk_data_ready
= ceph_sock_data_ready
;
418 sk
->sk_write_space
= ceph_sock_write_space
;
419 sk
->sk_state_change
= ceph_sock_state_change
;
428 * initiate connection to a remote socket.
430 int ceph_tcp_connect(struct ceph_connection
*con
)
432 struct sockaddr_storage ss
= con
->peer_addr
.in_addr
; /* align */
434 unsigned int noio_flag
;
437 dout("%s con %p peer_addr %s\n", __func__
, con
,
438 ceph_pr_addr(&con
->peer_addr
));
441 /* sock_create_kern() allocates with GFP_KERNEL */
442 noio_flag
= memalloc_noio_save();
443 ret
= sock_create_kern(read_pnet(&con
->msgr
->net
), ss
.ss_family
,
444 SOCK_STREAM
, IPPROTO_TCP
, &sock
);
445 memalloc_noio_restore(noio_flag
);
448 sock
->sk
->sk_allocation
= GFP_NOFS
;
450 #ifdef CONFIG_LOCKDEP
451 lockdep_set_class(&sock
->sk
->sk_lock
, &socket_class
);
454 set_sock_callbacks(sock
, con
);
456 con_sock_state_connecting(con
);
457 ret
= sock
->ops
->connect(sock
, (struct sockaddr
*)&ss
, sizeof(ss
),
459 if (ret
== -EINPROGRESS
) {
460 dout("connect %s EINPROGRESS sk_state = %u\n",
461 ceph_pr_addr(&con
->peer_addr
),
463 } else if (ret
< 0) {
464 pr_err("connect %s error %d\n",
465 ceph_pr_addr(&con
->peer_addr
), ret
);
470 if (ceph_test_opt(from_msgr(con
->msgr
), TCP_NODELAY
))
471 tcp_sock_set_nodelay(sock
->sk
);
478 * Shutdown/close the socket for the given connection.
480 int ceph_con_close_socket(struct ceph_connection
*con
)
484 dout("%s con %p sock %p\n", __func__
, con
, con
->sock
);
486 rc
= con
->sock
->ops
->shutdown(con
->sock
, SHUT_RDWR
);
487 sock_release(con
->sock
);
492 * Forcibly clear the SOCK_CLOSED flag. It gets set
493 * independent of the connection mutex, and we could have
494 * received a socket close event before we had the chance to
495 * shut the socket down.
497 ceph_con_flag_clear(con
, CEPH_CON_F_SOCK_CLOSED
);
499 con_sock_state_closed(con
);
503 static void ceph_con_reset_protocol(struct ceph_connection
*con
)
505 dout("%s con %p\n", __func__
, con
);
507 ceph_con_close_socket(con
);
509 WARN_ON(con
->in_msg
->con
!= con
);
510 ceph_msg_put(con
->in_msg
);
514 WARN_ON(con
->out_msg
->con
!= con
);
515 ceph_msg_put(con
->out_msg
);
519 if (ceph_msgr2(from_msgr(con
->msgr
)))
520 ceph_con_v2_reset_protocol(con
);
522 ceph_con_v1_reset_protocol(con
);
526 * Reset a connection. Discard all incoming and outgoing messages
527 * and clear *_seq state.
529 static void ceph_msg_remove(struct ceph_msg
*msg
)
531 list_del_init(&msg
->list_head
);
536 static void ceph_msg_remove_list(struct list_head
*head
)
538 while (!list_empty(head
)) {
539 struct ceph_msg
*msg
= list_first_entry(head
, struct ceph_msg
,
541 ceph_msg_remove(msg
);
545 void ceph_con_reset_session(struct ceph_connection
*con
)
547 dout("%s con %p\n", __func__
, con
);
549 WARN_ON(con
->in_msg
);
550 WARN_ON(con
->out_msg
);
551 ceph_msg_remove_list(&con
->out_queue
);
552 ceph_msg_remove_list(&con
->out_sent
);
555 con
->in_seq_acked
= 0;
557 if (ceph_msgr2(from_msgr(con
->msgr
)))
558 ceph_con_v2_reset_session(con
);
560 ceph_con_v1_reset_session(con
);
564 * mark a peer down. drop any open connections.
566 void ceph_con_close(struct ceph_connection
*con
)
568 mutex_lock(&con
->mutex
);
569 dout("con_close %p peer %s\n", con
, ceph_pr_addr(&con
->peer_addr
));
570 con
->state
= CEPH_CON_S_CLOSED
;
572 ceph_con_flag_clear(con
, CEPH_CON_F_LOSSYTX
); /* so we retry next
574 ceph_con_flag_clear(con
, CEPH_CON_F_KEEPALIVE_PENDING
);
575 ceph_con_flag_clear(con
, CEPH_CON_F_WRITE_PENDING
);
576 ceph_con_flag_clear(con
, CEPH_CON_F_BACKOFF
);
578 ceph_con_reset_protocol(con
);
579 ceph_con_reset_session(con
);
581 mutex_unlock(&con
->mutex
);
583 EXPORT_SYMBOL(ceph_con_close
);
586 * Reopen a closed connection, with a new peer address.
588 void ceph_con_open(struct ceph_connection
*con
,
589 __u8 entity_type
, __u64 entity_num
,
590 struct ceph_entity_addr
*addr
)
592 mutex_lock(&con
->mutex
);
593 dout("con_open %p %s\n", con
, ceph_pr_addr(addr
));
595 WARN_ON(con
->state
!= CEPH_CON_S_CLOSED
);
596 con
->state
= CEPH_CON_S_PREOPEN
;
598 con
->peer_name
.type
= (__u8
) entity_type
;
599 con
->peer_name
.num
= cpu_to_le64(entity_num
);
601 memcpy(&con
->peer_addr
, addr
, sizeof(*addr
));
602 con
->delay
= 0; /* reset backoff memory */
603 mutex_unlock(&con
->mutex
);
606 EXPORT_SYMBOL(ceph_con_open
);
609 * return true if this connection ever successfully opened
611 bool ceph_con_opened(struct ceph_connection
*con
)
613 if (ceph_msgr2(from_msgr(con
->msgr
)))
614 return ceph_con_v2_opened(con
);
616 return ceph_con_v1_opened(con
);
620 * initialize a new connection.
622 void ceph_con_init(struct ceph_connection
*con
, void *private,
623 const struct ceph_connection_operations
*ops
,
624 struct ceph_messenger
*msgr
)
626 dout("con_init %p\n", con
);
627 memset(con
, 0, sizeof(*con
));
628 con
->private = private;
632 con_sock_state_init(con
);
634 mutex_init(&con
->mutex
);
635 INIT_LIST_HEAD(&con
->out_queue
);
636 INIT_LIST_HEAD(&con
->out_sent
);
637 INIT_DELAYED_WORK(&con
->work
, ceph_con_workfn
);
639 con
->state
= CEPH_CON_S_CLOSED
;
641 EXPORT_SYMBOL(ceph_con_init
);
644 * We maintain a global counter to order connection attempts. Get
645 * a unique seq greater than @gt.
647 u32
ceph_get_global_seq(struct ceph_messenger
*msgr
, u32 gt
)
651 spin_lock(&msgr
->global_seq_lock
);
652 if (msgr
->global_seq
< gt
)
653 msgr
->global_seq
= gt
;
654 ret
= ++msgr
->global_seq
;
655 spin_unlock(&msgr
->global_seq_lock
);
660 * Discard messages that have been acked by the server.
662 void ceph_con_discard_sent(struct ceph_connection
*con
, u64 ack_seq
)
664 struct ceph_msg
*msg
;
667 dout("%s con %p ack_seq %llu\n", __func__
, con
, ack_seq
);
668 while (!list_empty(&con
->out_sent
)) {
669 msg
= list_first_entry(&con
->out_sent
, struct ceph_msg
,
671 WARN_ON(msg
->needs_out_seq
);
672 seq
= le64_to_cpu(msg
->hdr
.seq
);
676 dout("%s con %p discarding msg %p seq %llu\n", __func__
, con
,
678 ceph_msg_remove(msg
);
683 * Discard messages that have been requeued in con_fault(), up to
684 * reconnect_seq. This avoids gratuitously resending messages that
685 * the server had received and handled prior to reconnect.
687 void ceph_con_discard_requeued(struct ceph_connection
*con
, u64 reconnect_seq
)
689 struct ceph_msg
*msg
;
692 dout("%s con %p reconnect_seq %llu\n", __func__
, con
, reconnect_seq
);
693 while (!list_empty(&con
->out_queue
)) {
694 msg
= list_first_entry(&con
->out_queue
, struct ceph_msg
,
696 if (msg
->needs_out_seq
)
698 seq
= le64_to_cpu(msg
->hdr
.seq
);
699 if (seq
> reconnect_seq
)
702 dout("%s con %p discarding msg %p seq %llu\n", __func__
, con
,
704 ceph_msg_remove(msg
);
711 * For a bio data item, a piece is whatever remains of the next
712 * entry in the current bio iovec, or the first entry in the next
715 static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data_cursor
*cursor
,
718 struct ceph_msg_data
*data
= cursor
->data
;
719 struct ceph_bio_iter
*it
= &cursor
->bio_iter
;
721 cursor
->resid
= min_t(size_t, length
, data
->bio_length
);
723 if (cursor
->resid
< it
->iter
.bi_size
)
724 it
->iter
.bi_size
= cursor
->resid
;
726 BUG_ON(cursor
->resid
< bio_iter_len(it
->bio
, it
->iter
));
727 cursor
->last_piece
= cursor
->resid
== bio_iter_len(it
->bio
, it
->iter
);
730 static struct page
*ceph_msg_data_bio_next(struct ceph_msg_data_cursor
*cursor
,
734 struct bio_vec bv
= bio_iter_iovec(cursor
->bio_iter
.bio
,
735 cursor
->bio_iter
.iter
);
737 *page_offset
= bv
.bv_offset
;
742 static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor
*cursor
,
745 struct ceph_bio_iter
*it
= &cursor
->bio_iter
;
746 struct page
*page
= bio_iter_page(it
->bio
, it
->iter
);
748 BUG_ON(bytes
> cursor
->resid
);
749 BUG_ON(bytes
> bio_iter_len(it
->bio
, it
->iter
));
750 cursor
->resid
-= bytes
;
751 bio_advance_iter(it
->bio
, &it
->iter
, bytes
);
753 if (!cursor
->resid
) {
754 BUG_ON(!cursor
->last_piece
);
755 return false; /* no more data */
758 if (!bytes
|| (it
->iter
.bi_size
&& it
->iter
.bi_bvec_done
&&
759 page
== bio_iter_page(it
->bio
, it
->iter
)))
760 return false; /* more bytes to process in this segment */
762 if (!it
->iter
.bi_size
) {
763 it
->bio
= it
->bio
->bi_next
;
764 it
->iter
= it
->bio
->bi_iter
;
765 if (cursor
->resid
< it
->iter
.bi_size
)
766 it
->iter
.bi_size
= cursor
->resid
;
769 BUG_ON(cursor
->last_piece
);
770 BUG_ON(cursor
->resid
< bio_iter_len(it
->bio
, it
->iter
));
771 cursor
->last_piece
= cursor
->resid
== bio_iter_len(it
->bio
, it
->iter
);
774 #endif /* CONFIG_BLOCK */
776 static void ceph_msg_data_bvecs_cursor_init(struct ceph_msg_data_cursor
*cursor
,
779 struct ceph_msg_data
*data
= cursor
->data
;
780 struct bio_vec
*bvecs
= data
->bvec_pos
.bvecs
;
782 cursor
->resid
= min_t(size_t, length
, data
->bvec_pos
.iter
.bi_size
);
783 cursor
->bvec_iter
= data
->bvec_pos
.iter
;
784 cursor
->bvec_iter
.bi_size
= cursor
->resid
;
786 BUG_ON(cursor
->resid
< bvec_iter_len(bvecs
, cursor
->bvec_iter
));
788 cursor
->resid
== bvec_iter_len(bvecs
, cursor
->bvec_iter
);
791 static struct page
*ceph_msg_data_bvecs_next(struct ceph_msg_data_cursor
*cursor
,
795 struct bio_vec bv
= bvec_iter_bvec(cursor
->data
->bvec_pos
.bvecs
,
798 *page_offset
= bv
.bv_offset
;
803 static bool ceph_msg_data_bvecs_advance(struct ceph_msg_data_cursor
*cursor
,
806 struct bio_vec
*bvecs
= cursor
->data
->bvec_pos
.bvecs
;
807 struct page
*page
= bvec_iter_page(bvecs
, cursor
->bvec_iter
);
809 BUG_ON(bytes
> cursor
->resid
);
810 BUG_ON(bytes
> bvec_iter_len(bvecs
, cursor
->bvec_iter
));
811 cursor
->resid
-= bytes
;
812 bvec_iter_advance(bvecs
, &cursor
->bvec_iter
, bytes
);
814 if (!cursor
->resid
) {
815 BUG_ON(!cursor
->last_piece
);
816 return false; /* no more data */
819 if (!bytes
|| (cursor
->bvec_iter
.bi_bvec_done
&&
820 page
== bvec_iter_page(bvecs
, cursor
->bvec_iter
)))
821 return false; /* more bytes to process in this segment */
823 BUG_ON(cursor
->last_piece
);
824 BUG_ON(cursor
->resid
< bvec_iter_len(bvecs
, cursor
->bvec_iter
));
826 cursor
->resid
== bvec_iter_len(bvecs
, cursor
->bvec_iter
);
831 * For a page array, a piece comes from the first page in the array
832 * that has not already been fully consumed.
834 static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data_cursor
*cursor
,
837 struct ceph_msg_data
*data
= cursor
->data
;
840 BUG_ON(data
->type
!= CEPH_MSG_DATA_PAGES
);
842 BUG_ON(!data
->pages
);
843 BUG_ON(!data
->length
);
845 cursor
->resid
= min(length
, data
->length
);
846 page_count
= calc_pages_for(data
->alignment
, (u64
)data
->length
);
847 cursor
->page_offset
= data
->alignment
& ~PAGE_MASK
;
848 cursor
->page_index
= 0;
849 BUG_ON(page_count
> (int)USHRT_MAX
);
850 cursor
->page_count
= (unsigned short)page_count
;
851 BUG_ON(length
> SIZE_MAX
- cursor
->page_offset
);
852 cursor
->last_piece
= cursor
->page_offset
+ cursor
->resid
<= PAGE_SIZE
;
856 ceph_msg_data_pages_next(struct ceph_msg_data_cursor
*cursor
,
857 size_t *page_offset
, size_t *length
)
859 struct ceph_msg_data
*data
= cursor
->data
;
861 BUG_ON(data
->type
!= CEPH_MSG_DATA_PAGES
);
863 BUG_ON(cursor
->page_index
>= cursor
->page_count
);
864 BUG_ON(cursor
->page_offset
>= PAGE_SIZE
);
866 *page_offset
= cursor
->page_offset
;
867 if (cursor
->last_piece
)
868 *length
= cursor
->resid
;
870 *length
= PAGE_SIZE
- *page_offset
;
872 return data
->pages
[cursor
->page_index
];
875 static bool ceph_msg_data_pages_advance(struct ceph_msg_data_cursor
*cursor
,
878 BUG_ON(cursor
->data
->type
!= CEPH_MSG_DATA_PAGES
);
880 BUG_ON(cursor
->page_offset
+ bytes
> PAGE_SIZE
);
882 /* Advance the cursor page offset */
884 cursor
->resid
-= bytes
;
885 cursor
->page_offset
= (cursor
->page_offset
+ bytes
) & ~PAGE_MASK
;
886 if (!bytes
|| cursor
->page_offset
)
887 return false; /* more bytes to process in the current page */
890 return false; /* no more data */
892 /* Move on to the next page; offset is already at 0 */
894 BUG_ON(cursor
->page_index
>= cursor
->page_count
);
895 cursor
->page_index
++;
896 cursor
->last_piece
= cursor
->resid
<= PAGE_SIZE
;
902 * For a pagelist, a piece is whatever remains to be consumed in the
903 * first page in the list, or the front of the next page.
906 ceph_msg_data_pagelist_cursor_init(struct ceph_msg_data_cursor
*cursor
,
909 struct ceph_msg_data
*data
= cursor
->data
;
910 struct ceph_pagelist
*pagelist
;
913 BUG_ON(data
->type
!= CEPH_MSG_DATA_PAGELIST
);
915 pagelist
= data
->pagelist
;
919 return; /* pagelist can be assigned but empty */
921 BUG_ON(list_empty(&pagelist
->head
));
922 page
= list_first_entry(&pagelist
->head
, struct page
, lru
);
924 cursor
->resid
= min(length
, pagelist
->length
);
927 cursor
->last_piece
= cursor
->resid
<= PAGE_SIZE
;
931 ceph_msg_data_pagelist_next(struct ceph_msg_data_cursor
*cursor
,
932 size_t *page_offset
, size_t *length
)
934 struct ceph_msg_data
*data
= cursor
->data
;
935 struct ceph_pagelist
*pagelist
;
937 BUG_ON(data
->type
!= CEPH_MSG_DATA_PAGELIST
);
939 pagelist
= data
->pagelist
;
942 BUG_ON(!cursor
->page
);
943 BUG_ON(cursor
->offset
+ cursor
->resid
!= pagelist
->length
);
945 /* offset of first page in pagelist is always 0 */
946 *page_offset
= cursor
->offset
& ~PAGE_MASK
;
947 if (cursor
->last_piece
)
948 *length
= cursor
->resid
;
950 *length
= PAGE_SIZE
- *page_offset
;
955 static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor
*cursor
,
958 struct ceph_msg_data
*data
= cursor
->data
;
959 struct ceph_pagelist
*pagelist
;
961 BUG_ON(data
->type
!= CEPH_MSG_DATA_PAGELIST
);
963 pagelist
= data
->pagelist
;
966 BUG_ON(cursor
->offset
+ cursor
->resid
!= pagelist
->length
);
967 BUG_ON((cursor
->offset
& ~PAGE_MASK
) + bytes
> PAGE_SIZE
);
969 /* Advance the cursor offset */
971 cursor
->resid
-= bytes
;
972 cursor
->offset
+= bytes
;
973 /* offset of first page in pagelist is always 0 */
974 if (!bytes
|| cursor
->offset
& ~PAGE_MASK
)
975 return false; /* more bytes to process in the current page */
978 return false; /* no more data */
980 /* Move on to the next page */
982 BUG_ON(list_is_last(&cursor
->page
->lru
, &pagelist
->head
));
983 cursor
->page
= list_next_entry(cursor
->page
, lru
);
984 cursor
->last_piece
= cursor
->resid
<= PAGE_SIZE
;
990 * Message data is handled (sent or received) in pieces, where each
991 * piece resides on a single page. The network layer might not
992 * consume an entire piece at once. A data item's cursor keeps
993 * track of which piece is next to process and how much remains to
994 * be processed in that piece. It also tracks whether the current
995 * piece is the last one in the data item.
997 static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor
*cursor
)
999 size_t length
= cursor
->total_resid
;
1001 switch (cursor
->data
->type
) {
1002 case CEPH_MSG_DATA_PAGELIST
:
1003 ceph_msg_data_pagelist_cursor_init(cursor
, length
);
1005 case CEPH_MSG_DATA_PAGES
:
1006 ceph_msg_data_pages_cursor_init(cursor
, length
);
1009 case CEPH_MSG_DATA_BIO
:
1010 ceph_msg_data_bio_cursor_init(cursor
, length
);
1012 #endif /* CONFIG_BLOCK */
1013 case CEPH_MSG_DATA_BVECS
:
1014 ceph_msg_data_bvecs_cursor_init(cursor
, length
);
1016 case CEPH_MSG_DATA_NONE
:
1021 cursor
->need_crc
= true;
1024 void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor
*cursor
,
1025 struct ceph_msg
*msg
, size_t length
)
1028 BUG_ON(length
> msg
->data_length
);
1029 BUG_ON(!msg
->num_data_items
);
1031 cursor
->total_resid
= length
;
1032 cursor
->data
= msg
->data
;
1034 __ceph_msg_data_cursor_init(cursor
);
1038 * Return the page containing the next piece to process for a given
1039 * data item, and supply the page offset and length of that piece.
1040 * Indicate whether this is the last piece in this data item.
1042 struct page
*ceph_msg_data_next(struct ceph_msg_data_cursor
*cursor
,
1043 size_t *page_offset
, size_t *length
,
1048 switch (cursor
->data
->type
) {
1049 case CEPH_MSG_DATA_PAGELIST
:
1050 page
= ceph_msg_data_pagelist_next(cursor
, page_offset
, length
);
1052 case CEPH_MSG_DATA_PAGES
:
1053 page
= ceph_msg_data_pages_next(cursor
, page_offset
, length
);
1056 case CEPH_MSG_DATA_BIO
:
1057 page
= ceph_msg_data_bio_next(cursor
, page_offset
, length
);
1059 #endif /* CONFIG_BLOCK */
1060 case CEPH_MSG_DATA_BVECS
:
1061 page
= ceph_msg_data_bvecs_next(cursor
, page_offset
, length
);
1063 case CEPH_MSG_DATA_NONE
:
1070 BUG_ON(*page_offset
+ *length
> PAGE_SIZE
);
1072 BUG_ON(*length
> cursor
->resid
);
1074 *last_piece
= cursor
->last_piece
;
1080 * Returns true if the result moves the cursor on to the next piece
1083 void ceph_msg_data_advance(struct ceph_msg_data_cursor
*cursor
, size_t bytes
)
1087 BUG_ON(bytes
> cursor
->resid
);
1088 switch (cursor
->data
->type
) {
1089 case CEPH_MSG_DATA_PAGELIST
:
1090 new_piece
= ceph_msg_data_pagelist_advance(cursor
, bytes
);
1092 case CEPH_MSG_DATA_PAGES
:
1093 new_piece
= ceph_msg_data_pages_advance(cursor
, bytes
);
1096 case CEPH_MSG_DATA_BIO
:
1097 new_piece
= ceph_msg_data_bio_advance(cursor
, bytes
);
1099 #endif /* CONFIG_BLOCK */
1100 case CEPH_MSG_DATA_BVECS
:
1101 new_piece
= ceph_msg_data_bvecs_advance(cursor
, bytes
);
1103 case CEPH_MSG_DATA_NONE
:
1108 cursor
->total_resid
-= bytes
;
1110 if (!cursor
->resid
&& cursor
->total_resid
) {
1111 WARN_ON(!cursor
->last_piece
);
1113 __ceph_msg_data_cursor_init(cursor
);
1116 cursor
->need_crc
= new_piece
;
1119 u32
ceph_crc32c_page(u32 crc
, struct page
*page
, unsigned int page_offset
,
1120 unsigned int length
)
1125 BUG_ON(kaddr
== NULL
);
1126 crc
= crc32c(crc
, kaddr
+ page_offset
, length
);
1132 bool ceph_addr_is_blank(const struct ceph_entity_addr
*addr
)
1134 struct sockaddr_storage ss
= addr
->in_addr
; /* align */
1135 struct in_addr
*addr4
= &((struct sockaddr_in
*)&ss
)->sin_addr
;
1136 struct in6_addr
*addr6
= &((struct sockaddr_in6
*)&ss
)->sin6_addr
;
1138 switch (ss
.ss_family
) {
1140 return addr4
->s_addr
== htonl(INADDR_ANY
);
1142 return ipv6_addr_any(addr6
);
1148 int ceph_addr_port(const struct ceph_entity_addr
*addr
)
1150 switch (get_unaligned(&addr
->in_addr
.ss_family
)) {
1152 return ntohs(get_unaligned(&((struct sockaddr_in
*)&addr
->in_addr
)->sin_port
));
1154 return ntohs(get_unaligned(&((struct sockaddr_in6
*)&addr
->in_addr
)->sin6_port
));
1159 void ceph_addr_set_port(struct ceph_entity_addr
*addr
, int p
)
1161 switch (get_unaligned(&addr
->in_addr
.ss_family
)) {
1163 put_unaligned(htons(p
), &((struct sockaddr_in
*)&addr
->in_addr
)->sin_port
);
1166 put_unaligned(htons(p
), &((struct sockaddr_in6
*)&addr
->in_addr
)->sin6_port
);
1172 * Unlike other *_pton function semantics, zero indicates success.
1174 static int ceph_pton(const char *str
, size_t len
, struct ceph_entity_addr
*addr
,
1175 char delim
, const char **ipend
)
1177 memset(&addr
->in_addr
, 0, sizeof(addr
->in_addr
));
1179 if (in4_pton(str
, len
, (u8
*)&((struct sockaddr_in
*)&addr
->in_addr
)->sin_addr
.s_addr
, delim
, ipend
)) {
1180 put_unaligned(AF_INET
, &addr
->in_addr
.ss_family
);
1184 if (in6_pton(str
, len
, (u8
*)&((struct sockaddr_in6
*)&addr
->in_addr
)->sin6_addr
.s6_addr
, delim
, ipend
)) {
1185 put_unaligned(AF_INET6
, &addr
->in_addr
.ss_family
);
1193 * Extract hostname string and resolve using kernel DNS facility.
1195 #ifdef CONFIG_CEPH_LIB_USE_DNS_RESOLVER
1196 static int ceph_dns_resolve_name(const char *name
, size_t namelen
,
1197 struct ceph_entity_addr
*addr
, char delim
, const char **ipend
)
1199 const char *end
, *delim_p
;
1200 char *colon_p
, *ip_addr
= NULL
;
1204 * The end of the hostname occurs immediately preceding the delimiter or
1205 * the port marker (':') where the delimiter takes precedence.
1207 delim_p
= memchr(name
, delim
, namelen
);
1208 colon_p
= memchr(name
, ':', namelen
);
1210 if (delim_p
&& colon_p
)
1211 end
= delim_p
< colon_p
? delim_p
: colon_p
;
1212 else if (!delim_p
&& colon_p
)
1216 if (!end
) /* case: hostname:/ */
1217 end
= name
+ namelen
;
1223 /* do dns_resolve upcall */
1224 ip_len
= dns_query(current
->nsproxy
->net_ns
,
1225 NULL
, name
, end
- name
, NULL
, &ip_addr
, NULL
, false);
1227 ret
= ceph_pton(ip_addr
, ip_len
, addr
, -1, NULL
);
1235 pr_info("resolve '%.*s' (ret=%d): %s\n", (int)(end
- name
), name
,
1236 ret
, ret
? "failed" : ceph_pr_addr(addr
));
1241 static inline int ceph_dns_resolve_name(const char *name
, size_t namelen
,
1242 struct ceph_entity_addr
*addr
, char delim
, const char **ipend
)
1249 * Parse a server name (IP or hostname). If a valid IP address is not found
1250 * then try to extract a hostname to resolve using userspace DNS upcall.
1252 static int ceph_parse_server_name(const char *name
, size_t namelen
,
1253 struct ceph_entity_addr
*addr
, char delim
, const char **ipend
)
1257 ret
= ceph_pton(name
, namelen
, addr
, delim
, ipend
);
1259 ret
= ceph_dns_resolve_name(name
, namelen
, addr
, delim
, ipend
);
1265 * Parse an ip[:port] list into an addr array. Use the default
1266 * monitor port if a port isn't specified.
1268 int ceph_parse_ips(const char *c
, const char *end
,
1269 struct ceph_entity_addr
*addr
,
1270 int max_count
, int *count
)
1272 int i
, ret
= -EINVAL
;
1275 dout("parse_ips on '%.*s'\n", (int)(end
-c
), c
);
1276 for (i
= 0; i
< max_count
; i
++) {
1286 ret
= ceph_parse_server_name(p
, end
- p
, &addr
[i
], delim
, &ipend
);
1295 dout("missing matching ']'\n");
1302 if (p
< end
&& *p
== ':') {
1305 while (p
< end
&& *p
>= '0' && *p
<= '9') {
1306 port
= (port
* 10) + (*p
- '0');
1310 port
= CEPH_MON_PORT
;
1311 else if (port
> 65535)
1314 port
= CEPH_MON_PORT
;
1317 ceph_addr_set_port(&addr
[i
], port
);
1319 * We want the type to be set according to ms_mode
1320 * option, but options are normally parsed after mon
1321 * addresses. Rather than complicating parsing, set
1322 * to LEGACY and override in build_initial_monmap()
1323 * for mon addresses and ceph_messenger_init() for
1326 addr
[i
].type
= CEPH_ENTITY_ADDR_TYPE_LEGACY
;
1329 dout("parse_ips got %s\n", ceph_pr_addr(&addr
[i
]));
1350 * Process message. This happens in the worker thread. The callback should
1351 * be careful not to do anything that waits on other incoming messages or it
1354 void ceph_con_process_message(struct ceph_connection
*con
)
1356 struct ceph_msg
*msg
= con
->in_msg
;
1358 BUG_ON(con
->in_msg
->con
!= con
);
1361 /* if first message, set peer_name */
1362 if (con
->peer_name
.type
== 0)
1363 con
->peer_name
= msg
->hdr
.src
;
1366 mutex_unlock(&con
->mutex
);
1368 dout("===== %p %llu from %s%lld %d=%s len %d+%d+%d (%u %u %u) =====\n",
1369 msg
, le64_to_cpu(msg
->hdr
.seq
),
1370 ENTITY_NAME(msg
->hdr
.src
),
1371 le16_to_cpu(msg
->hdr
.type
),
1372 ceph_msg_type_name(le16_to_cpu(msg
->hdr
.type
)),
1373 le32_to_cpu(msg
->hdr
.front_len
),
1374 le32_to_cpu(msg
->hdr
.middle_len
),
1375 le32_to_cpu(msg
->hdr
.data_len
),
1376 con
->in_front_crc
, con
->in_middle_crc
, con
->in_data_crc
);
1377 con
->ops
->dispatch(con
, msg
);
1379 mutex_lock(&con
->mutex
);
1383 * Atomically queue work on a connection after the specified delay.
1384 * Bump @con reference to avoid races with connection teardown.
1385 * Returns 0 if work was queued, or an error code otherwise.
1387 static int queue_con_delay(struct ceph_connection
*con
, unsigned long delay
)
1389 if (!con
->ops
->get(con
)) {
1390 dout("%s %p ref count 0\n", __func__
, con
);
1395 delay
= round_jiffies_relative(delay
);
1397 dout("%s %p %lu\n", __func__
, con
, delay
);
1398 if (!queue_delayed_work(ceph_msgr_wq
, &con
->work
, delay
)) {
1399 dout("%s %p - already queued\n", __func__
, con
);
1407 static void queue_con(struct ceph_connection
*con
)
1409 (void) queue_con_delay(con
, 0);
1412 static void cancel_con(struct ceph_connection
*con
)
1414 if (cancel_delayed_work(&con
->work
)) {
1415 dout("%s %p\n", __func__
, con
);
1420 static bool con_sock_closed(struct ceph_connection
*con
)
1422 if (!ceph_con_flag_test_and_clear(con
, CEPH_CON_F_SOCK_CLOSED
))
1426 case CEPH_CON_S_ ## x: \
1427 con->error_msg = "socket closed (con state " #x ")"; \
1430 switch (con
->state
) {
1434 CASE(V1_CONNECT_MSG
);
1435 CASE(V2_BANNER_PREFIX
);
1436 CASE(V2_BANNER_PAYLOAD
);
1439 CASE(V2_AUTH_SIGNATURE
);
1440 CASE(V2_SESSION_CONNECT
);
1441 CASE(V2_SESSION_RECONNECT
);
1452 static bool con_backoff(struct ceph_connection
*con
)
1456 if (!ceph_con_flag_test_and_clear(con
, CEPH_CON_F_BACKOFF
))
1459 ret
= queue_con_delay(con
, con
->delay
);
1461 dout("%s: con %p FAILED to back off %lu\n", __func__
,
1463 BUG_ON(ret
== -ENOENT
);
1464 ceph_con_flag_set(con
, CEPH_CON_F_BACKOFF
);
1470 /* Finish fault handling; con->mutex must *not* be held here */
1472 static void con_fault_finish(struct ceph_connection
*con
)
1474 dout("%s %p\n", __func__
, con
);
1477 * in case we faulted due to authentication, invalidate our
1478 * current tickets so that we can get new ones.
1480 if (con
->v1
.auth_retry
) {
1481 dout("auth_retry %d, invalidating\n", con
->v1
.auth_retry
);
1482 if (con
->ops
->invalidate_authorizer
)
1483 con
->ops
->invalidate_authorizer(con
);
1484 con
->v1
.auth_retry
= 0;
1487 if (con
->ops
->fault
)
1488 con
->ops
->fault(con
);
1492 * Do some work on a connection. Drop a connection ref when we're done.
1494 static void ceph_con_workfn(struct work_struct
*work
)
1496 struct ceph_connection
*con
= container_of(work
, struct ceph_connection
,
1500 mutex_lock(&con
->mutex
);
1504 if ((fault
= con_sock_closed(con
))) {
1505 dout("%s: con %p SOCK_CLOSED\n", __func__
, con
);
1508 if (con_backoff(con
)) {
1509 dout("%s: con %p BACKOFF\n", __func__
, con
);
1512 if (con
->state
== CEPH_CON_S_STANDBY
) {
1513 dout("%s: con %p STANDBY\n", __func__
, con
);
1516 if (con
->state
== CEPH_CON_S_CLOSED
) {
1517 dout("%s: con %p CLOSED\n", __func__
, con
);
1521 if (con
->state
== CEPH_CON_S_PREOPEN
) {
1522 dout("%s: con %p PREOPEN\n", __func__
, con
);
1526 if (ceph_msgr2(from_msgr(con
->msgr
)))
1527 ret
= ceph_con_v2_try_read(con
);
1529 ret
= ceph_con_v1_try_read(con
);
1533 if (!con
->error_msg
)
1534 con
->error_msg
= "socket error on read";
1539 if (ceph_msgr2(from_msgr(con
->msgr
)))
1540 ret
= ceph_con_v2_try_write(con
);
1542 ret
= ceph_con_v1_try_write(con
);
1546 if (!con
->error_msg
)
1547 con
->error_msg
= "socket error on write";
1551 break; /* If we make it to here, we're done */
1555 mutex_unlock(&con
->mutex
);
1558 con_fault_finish(con
);
1564 * Generic error/fault handler. A retry mechanism is used with
1565 * exponential backoff
1567 static void con_fault(struct ceph_connection
*con
)
1569 dout("fault %p state %d to peer %s\n",
1570 con
, con
->state
, ceph_pr_addr(&con
->peer_addr
));
1572 pr_warn("%s%lld %s %s\n", ENTITY_NAME(con
->peer_name
),
1573 ceph_pr_addr(&con
->peer_addr
), con
->error_msg
);
1574 con
->error_msg
= NULL
;
1576 WARN_ON(con
->state
== CEPH_CON_S_STANDBY
||
1577 con
->state
== CEPH_CON_S_CLOSED
);
1579 ceph_con_reset_protocol(con
);
1581 if (ceph_con_flag_test(con
, CEPH_CON_F_LOSSYTX
)) {
1582 dout("fault on LOSSYTX channel, marking CLOSED\n");
1583 con
->state
= CEPH_CON_S_CLOSED
;
1587 /* Requeue anything that hasn't been acked */
1588 list_splice_init(&con
->out_sent
, &con
->out_queue
);
1590 /* If there are no messages queued or keepalive pending, place
1591 * the connection in a STANDBY state */
1592 if (list_empty(&con
->out_queue
) &&
1593 !ceph_con_flag_test(con
, CEPH_CON_F_KEEPALIVE_PENDING
)) {
1594 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con
);
1595 ceph_con_flag_clear(con
, CEPH_CON_F_WRITE_PENDING
);
1596 con
->state
= CEPH_CON_S_STANDBY
;
1598 /* retry after a delay. */
1599 con
->state
= CEPH_CON_S_PREOPEN
;
1601 con
->delay
= BASE_DELAY_INTERVAL
;
1602 } else if (con
->delay
< MAX_DELAY_INTERVAL
) {
1604 if (con
->delay
> MAX_DELAY_INTERVAL
)
1605 con
->delay
= MAX_DELAY_INTERVAL
;
1607 ceph_con_flag_set(con
, CEPH_CON_F_BACKOFF
);
1612 void ceph_messenger_reset_nonce(struct ceph_messenger
*msgr
)
1614 u32 nonce
= le32_to_cpu(msgr
->inst
.addr
.nonce
) + 1000000;
1615 msgr
->inst
.addr
.nonce
= cpu_to_le32(nonce
);
1616 ceph_encode_my_addr(msgr
);
1620 * initialize a new messenger instance
1622 void ceph_messenger_init(struct ceph_messenger
*msgr
,
1623 struct ceph_entity_addr
*myaddr
)
1625 spin_lock_init(&msgr
->global_seq_lock
);
1628 memcpy(&msgr
->inst
.addr
.in_addr
, &myaddr
->in_addr
,
1629 sizeof(msgr
->inst
.addr
.in_addr
));
1630 ceph_addr_set_port(&msgr
->inst
.addr
, 0);
1634 * Since nautilus, clients are identified using type ANY.
1635 * For msgr1, ceph_encode_banner_addr() munges it to NONE.
1637 msgr
->inst
.addr
.type
= CEPH_ENTITY_ADDR_TYPE_ANY
;
1639 /* generate a random non-zero nonce */
1641 get_random_bytes(&msgr
->inst
.addr
.nonce
,
1642 sizeof(msgr
->inst
.addr
.nonce
));
1643 } while (!msgr
->inst
.addr
.nonce
);
1644 ceph_encode_my_addr(msgr
);
1646 atomic_set(&msgr
->stopping
, 0);
1647 write_pnet(&msgr
->net
, get_net(current
->nsproxy
->net_ns
));
1649 dout("%s %p\n", __func__
, msgr
);
1652 void ceph_messenger_fini(struct ceph_messenger
*msgr
)
1654 put_net(read_pnet(&msgr
->net
));
1657 static void msg_con_set(struct ceph_msg
*msg
, struct ceph_connection
*con
)
1660 msg
->con
->ops
->put(msg
->con
);
1662 msg
->con
= con
? con
->ops
->get(con
) : NULL
;
1663 BUG_ON(msg
->con
!= con
);
1666 static void clear_standby(struct ceph_connection
*con
)
1668 /* come back from STANDBY? */
1669 if (con
->state
== CEPH_CON_S_STANDBY
) {
1670 dout("clear_standby %p and ++connect_seq\n", con
);
1671 con
->state
= CEPH_CON_S_PREOPEN
;
1672 con
->v1
.connect_seq
++;
1673 WARN_ON(ceph_con_flag_test(con
, CEPH_CON_F_WRITE_PENDING
));
1674 WARN_ON(ceph_con_flag_test(con
, CEPH_CON_F_KEEPALIVE_PENDING
));
1679 * Queue up an outgoing message on the given connection.
1681 * Consumes a ref on @msg.
1683 void ceph_con_send(struct ceph_connection
*con
, struct ceph_msg
*msg
)
1686 msg
->hdr
.src
= con
->msgr
->inst
.name
;
1687 BUG_ON(msg
->front
.iov_len
!= le32_to_cpu(msg
->hdr
.front_len
));
1688 msg
->needs_out_seq
= true;
1690 mutex_lock(&con
->mutex
);
1692 if (con
->state
== CEPH_CON_S_CLOSED
) {
1693 dout("con_send %p closed, dropping %p\n", con
, msg
);
1695 mutex_unlock(&con
->mutex
);
1699 msg_con_set(msg
, con
);
1701 BUG_ON(!list_empty(&msg
->list_head
));
1702 list_add_tail(&msg
->list_head
, &con
->out_queue
);
1703 dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg
,
1704 ENTITY_NAME(con
->peer_name
), le16_to_cpu(msg
->hdr
.type
),
1705 ceph_msg_type_name(le16_to_cpu(msg
->hdr
.type
)),
1706 le32_to_cpu(msg
->hdr
.front_len
),
1707 le32_to_cpu(msg
->hdr
.middle_len
),
1708 le32_to_cpu(msg
->hdr
.data_len
));
1711 mutex_unlock(&con
->mutex
);
1713 /* if there wasn't anything waiting to send before, queue
1715 if (!ceph_con_flag_test_and_set(con
, CEPH_CON_F_WRITE_PENDING
))
1718 EXPORT_SYMBOL(ceph_con_send
);
1721 * Revoke a message that was previously queued for send
1723 void ceph_msg_revoke(struct ceph_msg
*msg
)
1725 struct ceph_connection
*con
= msg
->con
;
1728 dout("%s msg %p null con\n", __func__
, msg
);
1729 return; /* Message not in our possession */
1732 mutex_lock(&con
->mutex
);
1733 if (list_empty(&msg
->list_head
)) {
1734 WARN_ON(con
->out_msg
== msg
);
1735 dout("%s con %p msg %p not linked\n", __func__
, con
, msg
);
1736 mutex_unlock(&con
->mutex
);
1740 dout("%s con %p msg %p was linked\n", __func__
, con
, msg
);
1742 ceph_msg_remove(msg
);
1744 if (con
->out_msg
== msg
) {
1745 WARN_ON(con
->state
!= CEPH_CON_S_OPEN
);
1746 dout("%s con %p msg %p was sending\n", __func__
, con
, msg
);
1747 if (ceph_msgr2(from_msgr(con
->msgr
)))
1748 ceph_con_v2_revoke(con
);
1750 ceph_con_v1_revoke(con
);
1751 ceph_msg_put(con
->out_msg
);
1752 con
->out_msg
= NULL
;
1754 dout("%s con %p msg %p not current, out_msg %p\n", __func__
,
1755 con
, msg
, con
->out_msg
);
1757 mutex_unlock(&con
->mutex
);
1761 * Revoke a message that we may be reading data into
1763 void ceph_msg_revoke_incoming(struct ceph_msg
*msg
)
1765 struct ceph_connection
*con
= msg
->con
;
1768 dout("%s msg %p null con\n", __func__
, msg
);
1769 return; /* Message not in our possession */
1772 mutex_lock(&con
->mutex
);
1773 if (con
->in_msg
== msg
) {
1774 WARN_ON(con
->state
!= CEPH_CON_S_OPEN
);
1775 dout("%s con %p msg %p was recving\n", __func__
, con
, msg
);
1776 if (ceph_msgr2(from_msgr(con
->msgr
)))
1777 ceph_con_v2_revoke_incoming(con
);
1779 ceph_con_v1_revoke_incoming(con
);
1780 ceph_msg_put(con
->in_msg
);
1783 dout("%s con %p msg %p not current, in_msg %p\n", __func__
,
1784 con
, msg
, con
->in_msg
);
1786 mutex_unlock(&con
->mutex
);
1790 * Queue a keepalive byte to ensure the tcp connection is alive.
1792 void ceph_con_keepalive(struct ceph_connection
*con
)
1794 dout("con_keepalive %p\n", con
);
1795 mutex_lock(&con
->mutex
);
1797 ceph_con_flag_set(con
, CEPH_CON_F_KEEPALIVE_PENDING
);
1798 mutex_unlock(&con
->mutex
);
1800 if (!ceph_con_flag_test_and_set(con
, CEPH_CON_F_WRITE_PENDING
))
1803 EXPORT_SYMBOL(ceph_con_keepalive
);
1805 bool ceph_con_keepalive_expired(struct ceph_connection
*con
,
1806 unsigned long interval
)
1809 (con
->peer_features
& CEPH_FEATURE_MSGR_KEEPALIVE2
)) {
1810 struct timespec64 now
;
1811 struct timespec64 ts
;
1812 ktime_get_real_ts64(&now
);
1813 jiffies_to_timespec64(interval
, &ts
);
1814 ts
= timespec64_add(con
->last_keepalive_ack
, ts
);
1815 return timespec64_compare(&now
, &ts
) >= 0;
1820 static struct ceph_msg_data
*ceph_msg_data_add(struct ceph_msg
*msg
)
1822 BUG_ON(msg
->num_data_items
>= msg
->max_data_items
);
1823 return &msg
->data
[msg
->num_data_items
++];
1826 static void ceph_msg_data_destroy(struct ceph_msg_data
*data
)
1828 if (data
->type
== CEPH_MSG_DATA_PAGES
&& data
->own_pages
) {
1829 int num_pages
= calc_pages_for(data
->alignment
, data
->length
);
1830 ceph_release_page_vector(data
->pages
, num_pages
);
1831 } else if (data
->type
== CEPH_MSG_DATA_PAGELIST
) {
1832 ceph_pagelist_release(data
->pagelist
);
1836 void ceph_msg_data_add_pages(struct ceph_msg
*msg
, struct page
**pages
,
1837 size_t length
, size_t alignment
, bool own_pages
)
1839 struct ceph_msg_data
*data
;
1844 data
= ceph_msg_data_add(msg
);
1845 data
->type
= CEPH_MSG_DATA_PAGES
;
1846 data
->pages
= pages
;
1847 data
->length
= length
;
1848 data
->alignment
= alignment
& ~PAGE_MASK
;
1849 data
->own_pages
= own_pages
;
1851 msg
->data_length
+= length
;
1853 EXPORT_SYMBOL(ceph_msg_data_add_pages
);
1855 void ceph_msg_data_add_pagelist(struct ceph_msg
*msg
,
1856 struct ceph_pagelist
*pagelist
)
1858 struct ceph_msg_data
*data
;
1861 BUG_ON(!pagelist
->length
);
1863 data
= ceph_msg_data_add(msg
);
1864 data
->type
= CEPH_MSG_DATA_PAGELIST
;
1865 refcount_inc(&pagelist
->refcnt
);
1866 data
->pagelist
= pagelist
;
1868 msg
->data_length
+= pagelist
->length
;
1870 EXPORT_SYMBOL(ceph_msg_data_add_pagelist
);
1873 void ceph_msg_data_add_bio(struct ceph_msg
*msg
, struct ceph_bio_iter
*bio_pos
,
1876 struct ceph_msg_data
*data
;
1878 data
= ceph_msg_data_add(msg
);
1879 data
->type
= CEPH_MSG_DATA_BIO
;
1880 data
->bio_pos
= *bio_pos
;
1881 data
->bio_length
= length
;
1883 msg
->data_length
+= length
;
1885 EXPORT_SYMBOL(ceph_msg_data_add_bio
);
1886 #endif /* CONFIG_BLOCK */
1888 void ceph_msg_data_add_bvecs(struct ceph_msg
*msg
,
1889 struct ceph_bvec_iter
*bvec_pos
)
1891 struct ceph_msg_data
*data
;
1893 data
= ceph_msg_data_add(msg
);
1894 data
->type
= CEPH_MSG_DATA_BVECS
;
1895 data
->bvec_pos
= *bvec_pos
;
1897 msg
->data_length
+= bvec_pos
->iter
.bi_size
;
1899 EXPORT_SYMBOL(ceph_msg_data_add_bvecs
);
1902 * construct a new message with given type, size
1903 * the new msg has a ref count of 1.
1905 struct ceph_msg
*ceph_msg_new2(int type
, int front_len
, int max_data_items
,
1906 gfp_t flags
, bool can_fail
)
1910 m
= kmem_cache_zalloc(ceph_msg_cache
, flags
);
1914 m
->hdr
.type
= cpu_to_le16(type
);
1915 m
->hdr
.priority
= cpu_to_le16(CEPH_MSG_PRIO_DEFAULT
);
1916 m
->hdr
.front_len
= cpu_to_le32(front_len
);
1918 INIT_LIST_HEAD(&m
->list_head
);
1919 kref_init(&m
->kref
);
1923 m
->front
.iov_base
= ceph_kvmalloc(front_len
, flags
);
1924 if (m
->front
.iov_base
== NULL
) {
1925 dout("ceph_msg_new can't allocate %d bytes\n",
1930 m
->front
.iov_base
= NULL
;
1932 m
->front_alloc_len
= m
->front
.iov_len
= front_len
;
1934 if (max_data_items
) {
1935 m
->data
= kmalloc_array(max_data_items
, sizeof(*m
->data
),
1940 m
->max_data_items
= max_data_items
;
1943 dout("ceph_msg_new %p front %d\n", m
, front_len
);
1950 pr_err("msg_new can't create type %d front %d\n", type
,
1954 dout("msg_new can't create type %d front %d\n", type
,
1959 EXPORT_SYMBOL(ceph_msg_new2
);
1961 struct ceph_msg
*ceph_msg_new(int type
, int front_len
, gfp_t flags
,
1964 return ceph_msg_new2(type
, front_len
, 0, flags
, can_fail
);
1966 EXPORT_SYMBOL(ceph_msg_new
);
1969 * Allocate "middle" portion of a message, if it is needed and wasn't
1970 * allocated by alloc_msg. This allows us to read a small fixed-size
1971 * per-type header in the front and then gracefully fail (i.e.,
1972 * propagate the error to the caller based on info in the front) when
1973 * the middle is too large.
1975 static int ceph_alloc_middle(struct ceph_connection
*con
, struct ceph_msg
*msg
)
1977 int type
= le16_to_cpu(msg
->hdr
.type
);
1978 int middle_len
= le32_to_cpu(msg
->hdr
.middle_len
);
1980 dout("alloc_middle %p type %d %s middle_len %d\n", msg
, type
,
1981 ceph_msg_type_name(type
), middle_len
);
1982 BUG_ON(!middle_len
);
1983 BUG_ON(msg
->middle
);
1985 msg
->middle
= ceph_buffer_new(middle_len
, GFP_NOFS
);
1992 * Allocate a message for receiving an incoming message on a
1993 * connection, and save the result in con->in_msg. Uses the
1994 * connection's private alloc_msg op if available.
1996 * Returns 0 on success, or a negative error code.
1998 * On success, if we set *skip = 1:
1999 * - the next message should be skipped and ignored.
2000 * - con->in_msg == NULL
2001 * or if we set *skip = 0:
2002 * - con->in_msg is non-null.
2003 * On error (ENOMEM, EAGAIN, ...),
2004 * - con->in_msg == NULL
2006 int ceph_con_in_msg_alloc(struct ceph_connection
*con
,
2007 struct ceph_msg_header
*hdr
, int *skip
)
2009 int middle_len
= le32_to_cpu(hdr
->middle_len
);
2010 struct ceph_msg
*msg
;
2013 BUG_ON(con
->in_msg
!= NULL
);
2014 BUG_ON(!con
->ops
->alloc_msg
);
2016 mutex_unlock(&con
->mutex
);
2017 msg
= con
->ops
->alloc_msg(con
, hdr
, skip
);
2018 mutex_lock(&con
->mutex
);
2019 if (con
->state
!= CEPH_CON_S_OPEN
) {
2026 msg_con_set(msg
, con
);
2030 * Null message pointer means either we should skip
2031 * this message or we couldn't allocate memory. The
2032 * former is not an error.
2037 con
->error_msg
= "error allocating memory for incoming message";
2040 memcpy(&con
->in_msg
->hdr
, hdr
, sizeof(*hdr
));
2042 if (middle_len
&& !con
->in_msg
->middle
) {
2043 ret
= ceph_alloc_middle(con
, con
->in_msg
);
2045 ceph_msg_put(con
->in_msg
);
2053 void ceph_con_get_out_msg(struct ceph_connection
*con
)
2055 struct ceph_msg
*msg
;
2057 BUG_ON(list_empty(&con
->out_queue
));
2058 msg
= list_first_entry(&con
->out_queue
, struct ceph_msg
, list_head
);
2059 WARN_ON(msg
->con
!= con
);
2062 * Put the message on "sent" list using a ref from ceph_con_send().
2063 * It is put when the message is acked or revoked.
2065 list_move_tail(&msg
->list_head
, &con
->out_sent
);
2068 * Only assign outgoing seq # if we haven't sent this message
2069 * yet. If it is requeued, resend with it's original seq.
2071 if (msg
->needs_out_seq
) {
2072 msg
->hdr
.seq
= cpu_to_le64(++con
->out_seq
);
2073 msg
->needs_out_seq
= false;
2075 if (con
->ops
->reencode_message
)
2076 con
->ops
->reencode_message(msg
);
2080 * Get a ref for out_msg. It is put when we are done sending the
2081 * message or in case of a fault.
2083 WARN_ON(con
->out_msg
);
2084 con
->out_msg
= ceph_msg_get(msg
);
2088 * Free a generically kmalloc'd message.
2090 static void ceph_msg_free(struct ceph_msg
*m
)
2092 dout("%s %p\n", __func__
, m
);
2093 kvfree(m
->front
.iov_base
);
2095 kmem_cache_free(ceph_msg_cache
, m
);
2098 static void ceph_msg_release(struct kref
*kref
)
2100 struct ceph_msg
*m
= container_of(kref
, struct ceph_msg
, kref
);
2103 dout("%s %p\n", __func__
, m
);
2104 WARN_ON(!list_empty(&m
->list_head
));
2106 msg_con_set(m
, NULL
);
2108 /* drop middle, data, if any */
2110 ceph_buffer_put(m
->middle
);
2114 for (i
= 0; i
< m
->num_data_items
; i
++)
2115 ceph_msg_data_destroy(&m
->data
[i
]);
2118 ceph_msgpool_put(m
->pool
, m
);
2123 struct ceph_msg
*ceph_msg_get(struct ceph_msg
*msg
)
2125 dout("%s %p (was %d)\n", __func__
, msg
,
2126 kref_read(&msg
->kref
));
2127 kref_get(&msg
->kref
);
2130 EXPORT_SYMBOL(ceph_msg_get
);
2132 void ceph_msg_put(struct ceph_msg
*msg
)
2134 dout("%s %p (was %d)\n", __func__
, msg
,
2135 kref_read(&msg
->kref
));
2136 kref_put(&msg
->kref
, ceph_msg_release
);
2138 EXPORT_SYMBOL(ceph_msg_put
);
2140 void ceph_msg_dump(struct ceph_msg
*msg
)
2142 pr_debug("msg_dump %p (front_alloc_len %d length %zd)\n", msg
,
2143 msg
->front_alloc_len
, msg
->data_length
);
2144 print_hex_dump(KERN_DEBUG
, "header: ",
2145 DUMP_PREFIX_OFFSET
, 16, 1,
2146 &msg
->hdr
, sizeof(msg
->hdr
), true);
2147 print_hex_dump(KERN_DEBUG
, " front: ",
2148 DUMP_PREFIX_OFFSET
, 16, 1,
2149 msg
->front
.iov_base
, msg
->front
.iov_len
, true);
2151 print_hex_dump(KERN_DEBUG
, "middle: ",
2152 DUMP_PREFIX_OFFSET
, 16, 1,
2153 msg
->middle
->vec
.iov_base
,
2154 msg
->middle
->vec
.iov_len
, true);
2155 print_hex_dump(KERN_DEBUG
, "footer: ",
2156 DUMP_PREFIX_OFFSET
, 16, 1,
2157 &msg
->footer
, sizeof(msg
->footer
), true);
2159 EXPORT_SYMBOL(ceph_msg_dump
);