1 // SPDX-License-Identifier: GPL-2.0-or-later
5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
13 #include <linux/module.h>
14 #include <linux/rculist.h>
15 #include <linux/random.h>
19 #include "rtrs-clt-trace.h"
21 #define RTRS_CONNECT_TIMEOUT_MS 30000
23 * Wait a bit before trying to reconnect after a failure
24 * in order to give server time to finish clean up which
25 * leads to "false positives" failed reconnect attempts
27 #define RTRS_RECONNECT_BACKOFF 1000
29 * Wait for additional random time between 0 and 8 seconds
30 * before starting to reconnect to avoid clients reconnecting
31 * all at once in case of a major network outage
33 #define RTRS_RECONNECT_SEED 8
35 #define FIRST_CONN 0x01
36 /* limit to 128 * 4k = 512k max IO */
37 #define RTRS_MAX_SEGMENTS 128
39 MODULE_DESCRIPTION("RDMA Transport Client");
40 MODULE_LICENSE("GPL");
42 static const struct rtrs_rdma_dev_pd_ops dev_pd_ops
;
43 static struct rtrs_rdma_dev_pd dev_pd
= {
47 static struct workqueue_struct
*rtrs_wq
;
48 static const struct class rtrs_clt_dev_class
= {
49 .name
= "rtrs-client",
52 static inline bool rtrs_clt_is_connected(const struct rtrs_clt_sess
*clt
)
54 struct rtrs_clt_path
*clt_path
;
55 bool connected
= false;
58 list_for_each_entry_rcu(clt_path
, &clt
->paths_list
, s
.entry
)
59 if (READ_ONCE(clt_path
->state
) == RTRS_CLT_CONNECTED
) {
68 static struct rtrs_permit
*
69 __rtrs_get_permit(struct rtrs_clt_sess
*clt
, enum rtrs_clt_con_type con_type
)
71 size_t max_depth
= clt
->queue_depth
;
72 struct rtrs_permit
*permit
;
76 * Adapted from null_blk get_tag(). Callers from different cpus may
77 * grab the same bit, since find_first_zero_bit is not atomic.
78 * But then the test_and_set_bit_lock will fail for all the
79 * callers but one, so that they will loop again.
80 * This way an explicit spinlock is not required.
83 bit
= find_first_zero_bit(clt
->permits_map
, max_depth
);
86 } while (test_and_set_bit_lock(bit
, clt
->permits_map
));
88 permit
= get_permit(clt
, bit
);
89 WARN_ON(permit
->mem_id
!= bit
);
90 permit
->cpu_id
= raw_smp_processor_id();
91 permit
->con_type
= con_type
;
96 static inline void __rtrs_put_permit(struct rtrs_clt_sess
*clt
,
97 struct rtrs_permit
*permit
)
99 clear_bit_unlock(permit
->mem_id
, clt
->permits_map
);
103 * rtrs_clt_get_permit() - allocates permit for future RDMA operation
104 * @clt: Current session
105 * @con_type: Type of connection to use with the permit
106 * @can_wait: Wait type
109 * Allocates permit for the following RDMA operation. Permit is used
110 * to preallocate all resources and to propagate memory pressure
114 * Can sleep if @wait == RTRS_PERMIT_WAIT
116 struct rtrs_permit
*rtrs_clt_get_permit(struct rtrs_clt_sess
*clt
,
117 enum rtrs_clt_con_type con_type
,
118 enum wait_type can_wait
)
120 struct rtrs_permit
*permit
;
123 permit
= __rtrs_get_permit(clt
, con_type
);
124 if (permit
|| !can_wait
)
128 prepare_to_wait(&clt
->permits_wait
, &wait
,
129 TASK_UNINTERRUPTIBLE
);
130 permit
= __rtrs_get_permit(clt
, con_type
);
137 finish_wait(&clt
->permits_wait
, &wait
);
141 EXPORT_SYMBOL(rtrs_clt_get_permit
);
144 * rtrs_clt_put_permit() - puts allocated permit
145 * @clt: Current session
146 * @permit: Permit to be freed
151 void rtrs_clt_put_permit(struct rtrs_clt_sess
*clt
,
152 struct rtrs_permit
*permit
)
154 if (WARN_ON(!test_bit(permit
->mem_id
, clt
->permits_map
)))
157 __rtrs_put_permit(clt
, permit
);
160 * rtrs_clt_get_permit() adds itself to the &clt->permits_wait list
161 * before calling schedule(). So if rtrs_clt_get_permit() is sleeping
162 * it must have added itself to &clt->permits_wait before
163 * __rtrs_put_permit() finished.
164 * Hence it is safe to guard wake_up() with a waitqueue_active() test.
166 if (waitqueue_active(&clt
->permits_wait
))
167 wake_up(&clt
->permits_wait
);
169 EXPORT_SYMBOL(rtrs_clt_put_permit
);
172 * rtrs_permit_to_clt_con() - returns RDMA connection pointer by the permit
173 * @clt_path: client path pointer
174 * @permit: permit for the allocation of the RDMA buffer
176 * IO connection starts from 1.
177 * 0 connection is for user messages.
180 struct rtrs_clt_con
*rtrs_permit_to_clt_con(struct rtrs_clt_path
*clt_path
,
181 struct rtrs_permit
*permit
)
185 if (permit
->con_type
== RTRS_IO_CON
)
186 id
= (permit
->cpu_id
% (clt_path
->s
.irq_con_num
- 1)) + 1;
188 return to_clt_con(clt_path
->s
.con
[id
]);
192 * rtrs_clt_change_state() - change the session state through session state
195 * @clt_path: client path to change the state of.
196 * @new_state: state to change to.
198 * returns true if sess's state is changed to new state, otherwise return false.
201 * state_wq lock must be hold.
203 static bool rtrs_clt_change_state(struct rtrs_clt_path
*clt_path
,
204 enum rtrs_clt_state new_state
)
206 enum rtrs_clt_state old_state
;
207 bool changed
= false;
209 lockdep_assert_held(&clt_path
->state_wq
.lock
);
211 old_state
= clt_path
->state
;
213 case RTRS_CLT_CONNECTING
:
215 case RTRS_CLT_RECONNECTING
:
222 case RTRS_CLT_RECONNECTING
:
224 case RTRS_CLT_CONNECTED
:
225 case RTRS_CLT_CONNECTING_ERR
:
226 case RTRS_CLT_CLOSED
:
233 case RTRS_CLT_CONNECTED
:
235 case RTRS_CLT_CONNECTING
:
242 case RTRS_CLT_CONNECTING_ERR
:
244 case RTRS_CLT_CONNECTING
:
251 case RTRS_CLT_CLOSING
:
253 case RTRS_CLT_CONNECTING
:
254 case RTRS_CLT_CONNECTING_ERR
:
255 case RTRS_CLT_RECONNECTING
:
256 case RTRS_CLT_CONNECTED
:
263 case RTRS_CLT_CLOSED
:
265 case RTRS_CLT_CLOSING
:
274 case RTRS_CLT_CLOSED
:
285 clt_path
->state
= new_state
;
286 wake_up_locked(&clt_path
->state_wq
);
292 static bool rtrs_clt_change_state_from_to(struct rtrs_clt_path
*clt_path
,
293 enum rtrs_clt_state old_state
,
294 enum rtrs_clt_state new_state
)
296 bool changed
= false;
298 spin_lock_irq(&clt_path
->state_wq
.lock
);
299 if (clt_path
->state
== old_state
)
300 changed
= rtrs_clt_change_state(clt_path
, new_state
);
301 spin_unlock_irq(&clt_path
->state_wq
.lock
);
306 static void rtrs_clt_stop_and_destroy_conns(struct rtrs_clt_path
*clt_path
);
307 static void rtrs_rdma_error_recovery(struct rtrs_clt_con
*con
)
309 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
311 trace_rtrs_rdma_error_recovery(clt_path
);
313 if (rtrs_clt_change_state_from_to(clt_path
,
315 RTRS_CLT_RECONNECTING
)) {
316 queue_work(rtrs_wq
, &clt_path
->err_recovery_work
);
319 * Error can happen just on establishing new connection,
320 * so notify waiter with error state, waiter is responsible
321 * for cleaning the rest and reconnect if needed.
323 rtrs_clt_change_state_from_to(clt_path
,
325 RTRS_CLT_CONNECTING_ERR
);
329 static void rtrs_clt_fast_reg_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
331 struct rtrs_clt_con
*con
= to_clt_con(wc
->qp
->qp_context
);
333 if (wc
->status
!= IB_WC_SUCCESS
) {
334 rtrs_err_rl(con
->c
.path
, "Failed IB_WR_REG_MR: %s\n",
335 ib_wc_status_msg(wc
->status
));
336 rtrs_rdma_error_recovery(con
);
340 static struct ib_cqe fast_reg_cqe
= {
341 .done
= rtrs_clt_fast_reg_done
344 static void complete_rdma_req(struct rtrs_clt_io_req
*req
, int errno
,
345 bool notify
, bool can_wait
);
347 static void rtrs_clt_inv_rkey_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
349 struct rtrs_clt_io_req
*req
=
350 container_of(wc
->wr_cqe
, typeof(*req
), inv_cqe
);
351 struct rtrs_clt_con
*con
= to_clt_con(wc
->qp
->qp_context
);
353 if (wc
->status
!= IB_WC_SUCCESS
) {
354 rtrs_err_rl(con
->c
.path
, "Failed IB_WR_LOCAL_INV: %s\n",
355 ib_wc_status_msg(wc
->status
));
356 rtrs_rdma_error_recovery(con
);
358 req
->mr
->need_inval
= false;
359 if (req
->need_inv_comp
)
360 complete(&req
->inv_comp
);
362 /* Complete request from INV callback */
363 complete_rdma_req(req
, req
->inv_errno
, true, false);
366 static int rtrs_inv_rkey(struct rtrs_clt_io_req
*req
)
368 struct rtrs_clt_con
*con
= req
->con
;
369 struct ib_send_wr wr
= {
370 .opcode
= IB_WR_LOCAL_INV
,
371 .wr_cqe
= &req
->inv_cqe
,
372 .send_flags
= IB_SEND_SIGNALED
,
373 .ex
.invalidate_rkey
= req
->mr
->rkey
,
375 req
->inv_cqe
.done
= rtrs_clt_inv_rkey_done
;
377 return ib_post_send(con
->c
.qp
, &wr
, NULL
);
380 static void complete_rdma_req(struct rtrs_clt_io_req
*req
, int errno
,
381 bool notify
, bool can_wait
)
383 struct rtrs_clt_con
*con
= req
->con
;
384 struct rtrs_clt_path
*clt_path
;
389 if (WARN_ON(!req
->con
))
391 clt_path
= to_clt_path(con
->c
.path
);
394 if (req
->mr
->need_inval
) {
396 * We are here to invalidate read/write requests
397 * ourselves. In normal scenario server should
398 * send INV for all read requests, we do local
399 * invalidate for write requests ourselves, but
400 * we are here, thus three things could happen:
402 * 1. this is failover, when errno != 0
405 * 2. something totally bad happened and
406 * server forgot to send INV, so we
407 * should do that ourselves.
409 * 3. write request finishes, we need to do local
414 req
->need_inv_comp
= true;
416 /* This should be IO path, so always notify */
418 /* Save errno for INV callback */
419 req
->inv_errno
= errno
;
422 refcount_inc(&req
->ref
);
423 err
= rtrs_inv_rkey(req
);
425 rtrs_err_rl(con
->c
.path
, "Send INV WR key=%#x: %d\n",
427 } else if (can_wait
) {
428 wait_for_completion(&req
->inv_comp
);
430 if (!refcount_dec_and_test(&req
->ref
))
433 ib_dma_unmap_sg(clt_path
->s
.dev
->ib_dev
, req
->sglist
,
434 req
->sg_cnt
, req
->dir
);
436 if (!refcount_dec_and_test(&req
->ref
))
438 if (req
->mp_policy
== MP_POLICY_MIN_INFLIGHT
)
439 atomic_dec(&clt_path
->stats
->inflight
);
445 rtrs_err_rl(con
->c
.path
,
446 "IO %s request failed: error=%d path=%s [%s:%u] notify=%d\n",
447 req
->dir
== DMA_TO_DEVICE
? "write" : "read", errno
,
448 kobject_name(&clt_path
->kobj
), clt_path
->hca_name
,
449 clt_path
->hca_port
, notify
);
453 req
->conf(req
->priv
, errno
);
456 static int rtrs_post_send_rdma(struct rtrs_clt_con
*con
,
457 struct rtrs_clt_io_req
*req
,
458 struct rtrs_rbuf
*rbuf
, u32 off
,
459 u32 imm
, struct ib_send_wr
*wr
)
461 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
462 enum ib_send_flags flags
;
466 rtrs_wrn(con
->c
.path
,
467 "Doing RDMA Write failed, no data supplied\n");
471 /* user data and user message in the first list element */
472 sge
.addr
= req
->iu
->dma_addr
;
473 sge
.length
= req
->sg_size
;
474 sge
.lkey
= clt_path
->s
.dev
->ib_pd
->local_dma_lkey
;
477 * From time to time we have to post signalled sends,
478 * or send queue will fill up and only QP reset can help.
480 flags
= atomic_inc_return(&con
->c
.wr_cnt
) % clt_path
->s
.signal_interval
?
481 0 : IB_SEND_SIGNALED
;
483 ib_dma_sync_single_for_device(clt_path
->s
.dev
->ib_dev
,
485 req
->sg_size
, DMA_TO_DEVICE
);
487 return rtrs_iu_post_rdma_write_imm(&con
->c
, req
->iu
, &sge
, 1,
488 rbuf
->rkey
, rbuf
->addr
+ off
,
489 imm
, flags
, wr
, NULL
);
492 static void process_io_rsp(struct rtrs_clt_path
*clt_path
, u32 msg_id
,
493 s16 errno
, bool w_inval
)
495 struct rtrs_clt_io_req
*req
;
497 if (WARN_ON(msg_id
>= clt_path
->queue_depth
))
500 req
= &clt_path
->reqs
[msg_id
];
501 /* Drop need_inv if server responded with send with invalidation */
502 req
->mr
->need_inval
&= !w_inval
;
503 complete_rdma_req(req
, errno
, true, false);
506 static void rtrs_clt_recv_done(struct rtrs_clt_con
*con
, struct ib_wc
*wc
)
510 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
512 WARN_ON((clt_path
->flags
& RTRS_MSG_NEW_RKEY_F
) == 0);
513 iu
= container_of(wc
->wr_cqe
, struct rtrs_iu
,
515 err
= rtrs_iu_post_recv(&con
->c
, iu
);
517 rtrs_err(con
->c
.path
, "post iu failed %d\n", err
);
518 rtrs_rdma_error_recovery(con
);
522 static void rtrs_clt_rkey_rsp_done(struct rtrs_clt_con
*con
, struct ib_wc
*wc
)
524 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
525 struct rtrs_msg_rkey_rsp
*msg
;
526 u32 imm_type
, imm_payload
;
527 bool w_inval
= false;
532 WARN_ON((clt_path
->flags
& RTRS_MSG_NEW_RKEY_F
) == 0);
534 iu
= container_of(wc
->wr_cqe
, struct rtrs_iu
, cqe
);
536 if (wc
->byte_len
< sizeof(*msg
)) {
537 rtrs_err(con
->c
.path
, "rkey response is malformed: size %d\n",
541 ib_dma_sync_single_for_cpu(clt_path
->s
.dev
->ib_dev
, iu
->dma_addr
,
542 iu
->size
, DMA_FROM_DEVICE
);
544 if (le16_to_cpu(msg
->type
) != RTRS_MSG_RKEY_RSP
) {
545 rtrs_err(clt_path
->clt
,
546 "rkey response is malformed: type %d\n",
547 le16_to_cpu(msg
->type
));
550 buf_id
= le16_to_cpu(msg
->buf_id
);
551 if (WARN_ON(buf_id
>= clt_path
->queue_depth
))
554 rtrs_from_imm(be32_to_cpu(wc
->ex
.imm_data
), &imm_type
, &imm_payload
);
555 if (imm_type
== RTRS_IO_RSP_IMM
||
556 imm_type
== RTRS_IO_RSP_W_INV_IMM
) {
559 w_inval
= (imm_type
== RTRS_IO_RSP_W_INV_IMM
);
560 rtrs_from_io_rsp_imm(imm_payload
, &msg_id
, &err
);
562 if (WARN_ON(buf_id
!= msg_id
))
564 clt_path
->rbufs
[buf_id
].rkey
= le32_to_cpu(msg
->rkey
);
565 process_io_rsp(clt_path
, msg_id
, err
, w_inval
);
567 ib_dma_sync_single_for_device(clt_path
->s
.dev
->ib_dev
, iu
->dma_addr
,
568 iu
->size
, DMA_FROM_DEVICE
);
569 return rtrs_clt_recv_done(con
, wc
);
571 rtrs_rdma_error_recovery(con
);
574 static void rtrs_clt_rdma_done(struct ib_cq
*cq
, struct ib_wc
*wc
);
576 static struct ib_cqe io_comp_cqe
= {
577 .done
= rtrs_clt_rdma_done
581 * Post x2 empty WRs: first is for this RDMA with IMM,
582 * second is for RECV with INV, which happened earlier.
584 static int rtrs_post_recv_empty_x2(struct rtrs_con
*con
, struct ib_cqe
*cqe
)
586 struct ib_recv_wr wr_arr
[2], *wr
;
589 memset(wr_arr
, 0, sizeof(wr_arr
));
590 for (i
= 0; i
< ARRAY_SIZE(wr_arr
); i
++) {
594 /* Chain backwards */
595 wr
->next
= &wr_arr
[i
- 1];
598 return ib_post_recv(con
->qp
, wr
, NULL
);
601 static void rtrs_clt_rdma_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
603 struct rtrs_clt_con
*con
= to_clt_con(wc
->qp
->qp_context
);
604 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
605 u32 imm_type
, imm_payload
;
606 bool w_inval
= false;
609 if (wc
->status
!= IB_WC_SUCCESS
) {
610 if (wc
->status
!= IB_WC_WR_FLUSH_ERR
) {
611 rtrs_err(clt_path
->clt
, "RDMA failed: %s\n",
612 ib_wc_status_msg(wc
->status
));
613 rtrs_rdma_error_recovery(con
);
617 rtrs_clt_update_wc_stats(con
);
619 switch (wc
->opcode
) {
620 case IB_WC_RECV_RDMA_WITH_IMM
:
622 * post_recv() RDMA write completions of IO reqs (read/write)
625 if (WARN_ON(wc
->wr_cqe
->done
!= rtrs_clt_rdma_done
))
627 clt_path
->s
.hb_missed_cnt
= 0;
628 rtrs_from_imm(be32_to_cpu(wc
->ex
.imm_data
),
629 &imm_type
, &imm_payload
);
630 if (imm_type
== RTRS_IO_RSP_IMM
||
631 imm_type
== RTRS_IO_RSP_W_INV_IMM
) {
634 w_inval
= (imm_type
== RTRS_IO_RSP_W_INV_IMM
);
635 rtrs_from_io_rsp_imm(imm_payload
, &msg_id
, &err
);
637 process_io_rsp(clt_path
, msg_id
, err
, w_inval
);
638 } else if (imm_type
== RTRS_HB_MSG_IMM
) {
640 rtrs_send_hb_ack(&clt_path
->s
);
641 if (clt_path
->flags
& RTRS_MSG_NEW_RKEY_F
)
642 return rtrs_clt_recv_done(con
, wc
);
643 } else if (imm_type
== RTRS_HB_ACK_IMM
) {
645 clt_path
->s
.hb_cur_latency
=
646 ktime_sub(ktime_get(), clt_path
->s
.hb_last_sent
);
647 if (clt_path
->flags
& RTRS_MSG_NEW_RKEY_F
)
648 return rtrs_clt_recv_done(con
, wc
);
650 rtrs_wrn(con
->c
.path
, "Unknown IMM type %u\n",
655 * Post x2 empty WRs: first is for this RDMA with IMM,
656 * second is for RECV with INV, which happened earlier.
658 err
= rtrs_post_recv_empty_x2(&con
->c
, &io_comp_cqe
);
660 err
= rtrs_post_recv_empty(&con
->c
, &io_comp_cqe
);
662 rtrs_err(con
->c
.path
, "rtrs_post_recv_empty(): %d\n",
664 rtrs_rdma_error_recovery(con
);
669 * Key invalidations from server side
671 clt_path
->s
.hb_missed_cnt
= 0;
672 WARN_ON(!(wc
->wc_flags
& IB_WC_WITH_INVALIDATE
||
673 wc
->wc_flags
& IB_WC_WITH_IMM
));
674 WARN_ON(wc
->wr_cqe
->done
!= rtrs_clt_rdma_done
);
675 if (clt_path
->flags
& RTRS_MSG_NEW_RKEY_F
) {
676 if (wc
->wc_flags
& IB_WC_WITH_INVALIDATE
)
677 return rtrs_clt_recv_done(con
, wc
);
679 return rtrs_clt_rkey_rsp_done(con
, wc
);
682 case IB_WC_RDMA_WRITE
:
684 * post_send() RDMA write completions of IO reqs (read/write)
690 rtrs_wrn(clt_path
->clt
, "Unexpected WC type: %d\n", wc
->opcode
);
695 static int post_recv_io(struct rtrs_clt_con
*con
, size_t q_size
)
698 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
700 for (i
= 0; i
< q_size
; i
++) {
701 if (clt_path
->flags
& RTRS_MSG_NEW_RKEY_F
) {
702 struct rtrs_iu
*iu
= &con
->rsp_ius
[i
];
704 err
= rtrs_iu_post_recv(&con
->c
, iu
);
706 err
= rtrs_post_recv_empty(&con
->c
, &io_comp_cqe
);
715 static int post_recv_path(struct rtrs_clt_path
*clt_path
)
720 for (cid
= 0; cid
< clt_path
->s
.con_num
; cid
++) {
722 q_size
= SERVICE_CON_QUEUE_DEPTH
;
724 q_size
= clt_path
->queue_depth
;
727 * x2 for RDMA read responses + FR key invalidations,
728 * RDMA writes do not require any FR registrations.
732 err
= post_recv_io(to_clt_con(clt_path
->s
.con
[cid
]), q_size
);
734 rtrs_err(clt_path
->clt
, "post_recv_io(), err: %d\n",
745 struct list_head skip_list
;
746 struct rtrs_clt_sess
*clt
;
747 struct rtrs_clt_path
*(*next_path
)(struct path_it
*it
);
751 * rtrs_clt_get_next_path_or_null - get clt path from the list or return NULL
752 * @head: the head for the list.
753 * @clt_path: The element to take the next clt_path from.
755 * Next clt path returned in round-robin fashion, i.e. head will be skipped,
756 * but if list is observed as empty, NULL will be returned.
758 * This function may safely run concurrently with the _rcu list-mutation
759 * primitives such as list_add_rcu() as long as it's guarded by rcu_read_lock().
761 static inline struct rtrs_clt_path
*
762 rtrs_clt_get_next_path_or_null(struct list_head
*head
, struct rtrs_clt_path
*clt_path
)
764 return list_next_or_null_rcu(head
, &clt_path
->s
.entry
, typeof(*clt_path
), s
.entry
) ?:
765 list_next_or_null_rcu(head
,
766 READ_ONCE((&clt_path
->s
.entry
)->next
),
767 typeof(*clt_path
), s
.entry
);
771 * get_next_path_rr() - Returns path in round-robin fashion.
772 * @it: the path pointer
774 * Related to @MP_POLICY_RR
777 * rcu_read_lock() must be held.
779 static struct rtrs_clt_path
*get_next_path_rr(struct path_it
*it
)
781 struct rtrs_clt_path __rcu
**ppcpu_path
;
782 struct rtrs_clt_path
*path
;
783 struct rtrs_clt_sess
*clt
;
786 * Assert that rcu lock must be held
788 RCU_LOCKDEP_WARN(!rcu_read_lock_held(), "no rcu read lock held");
793 * Here we use two RCU objects: @paths_list and @pcpu_path
794 * pointer. See rtrs_clt_remove_path_from_arr() for details
795 * how that is handled.
798 ppcpu_path
= this_cpu_ptr(clt
->pcpu_path
);
799 path
= rcu_dereference(*ppcpu_path
);
801 path
= list_first_or_null_rcu(&clt
->paths_list
,
802 typeof(*path
), s
.entry
);
804 path
= rtrs_clt_get_next_path_or_null(&clt
->paths_list
, path
);
806 rcu_assign_pointer(*ppcpu_path
, path
);
812 * get_next_path_min_inflight() - Returns path with minimal inflight count.
813 * @it: the path pointer
815 * Related to @MP_POLICY_MIN_INFLIGHT
818 * rcu_read_lock() must be hold.
820 static struct rtrs_clt_path
*get_next_path_min_inflight(struct path_it
*it
)
822 struct rtrs_clt_path
*min_path
= NULL
;
823 struct rtrs_clt_sess
*clt
= it
->clt
;
824 struct rtrs_clt_path
*clt_path
;
825 int min_inflight
= INT_MAX
;
828 list_for_each_entry_rcu(clt_path
, &clt
->paths_list
, s
.entry
) {
829 if (READ_ONCE(clt_path
->state
) != RTRS_CLT_CONNECTED
)
832 if (!list_empty(raw_cpu_ptr(clt_path
->mp_skip_entry
)))
835 inflight
= atomic_read(&clt_path
->stats
->inflight
);
837 if (inflight
< min_inflight
) {
838 min_inflight
= inflight
;
844 * add the path to the skip list, so that next time we can get
848 list_add(raw_cpu_ptr(min_path
->mp_skip_entry
), &it
->skip_list
);
854 * get_next_path_min_latency() - Returns path with minimal latency.
855 * @it: the path pointer
857 * Return: a path with the lowest latency or NULL if all paths are tried
860 * rcu_read_lock() must be hold.
862 * Related to @MP_POLICY_MIN_LATENCY
864 * This DOES skip an already-tried path.
865 * There is a skip-list to skip a path if the path has tried but failed.
866 * It will try the minimum latency path and then the second minimum latency
867 * path and so on. Finally it will return NULL if all paths are tried.
868 * Therefore the caller MUST check the returned
869 * path is NULL and trigger the IO error.
871 static struct rtrs_clt_path
*get_next_path_min_latency(struct path_it
*it
)
873 struct rtrs_clt_path
*min_path
= NULL
;
874 struct rtrs_clt_sess
*clt
= it
->clt
;
875 struct rtrs_clt_path
*clt_path
;
876 ktime_t min_latency
= KTIME_MAX
;
879 list_for_each_entry_rcu(clt_path
, &clt
->paths_list
, s
.entry
) {
880 if (READ_ONCE(clt_path
->state
) != RTRS_CLT_CONNECTED
)
883 if (!list_empty(raw_cpu_ptr(clt_path
->mp_skip_entry
)))
886 latency
= clt_path
->s
.hb_cur_latency
;
888 if (latency
< min_latency
) {
889 min_latency
= latency
;
895 * add the path to the skip list, so that next time we can get
899 list_add(raw_cpu_ptr(min_path
->mp_skip_entry
), &it
->skip_list
);
904 static inline void path_it_init(struct path_it
*it
, struct rtrs_clt_sess
*clt
)
906 INIT_LIST_HEAD(&it
->skip_list
);
910 if (clt
->mp_policy
== MP_POLICY_RR
)
911 it
->next_path
= get_next_path_rr
;
912 else if (clt
->mp_policy
== MP_POLICY_MIN_INFLIGHT
)
913 it
->next_path
= get_next_path_min_inflight
;
915 it
->next_path
= get_next_path_min_latency
;
918 static inline void path_it_deinit(struct path_it
*it
)
920 struct list_head
*skip
, *tmp
;
922 * The skip_list is used only for the MIN_INFLIGHT and MIN_LATENCY policies.
923 * We need to remove paths from it, so that next IO can insert
924 * paths (->mp_skip_entry) into a skip_list again.
926 list_for_each_safe(skip
, tmp
, &it
->skip_list
)
931 * rtrs_clt_init_req() - Initialize an rtrs_clt_io_req holding information
932 * about an inflight IO.
933 * The user buffer holding user control message (not data) is copied into
934 * the corresponding buffer of rtrs_iu (req->iu->buf), which later on will
935 * also hold the control message of rtrs.
936 * @req: an io request holding information about IO.
937 * @clt_path: client path
938 * @conf: conformation callback function to notify upper layer.
939 * @permit: permit for allocation of RDMA remote buffer
940 * @priv: private pointer
941 * @vec: kernel vector containing control message
942 * @usr_len: length of the user message
943 * @sg: scater list for IO data
944 * @sg_cnt: number of scater list entries
945 * @data_len: length of the IO data
946 * @dir: direction of the IO.
948 static void rtrs_clt_init_req(struct rtrs_clt_io_req
*req
,
949 struct rtrs_clt_path
*clt_path
,
950 void (*conf
)(void *priv
, int errno
),
951 struct rtrs_permit
*permit
, void *priv
,
952 const struct kvec
*vec
, size_t usr_len
,
953 struct scatterlist
*sg
, size_t sg_cnt
,
954 size_t data_len
, int dir
)
956 struct iov_iter iter
;
959 req
->permit
= permit
;
961 req
->usr_len
= usr_len
;
962 req
->data_len
= data_len
;
964 req
->sg_cnt
= sg_cnt
;
967 req
->con
= rtrs_permit_to_clt_con(clt_path
, permit
);
969 req
->mr
->need_inval
= false;
970 req
->need_inv_comp
= false;
972 refcount_set(&req
->ref
, 1);
973 req
->mp_policy
= clt_path
->clt
->mp_policy
;
975 iov_iter_kvec(&iter
, ITER_SOURCE
, vec
, 1, usr_len
);
976 len
= _copy_from_iter(req
->iu
->buf
, usr_len
, &iter
);
977 WARN_ON(len
!= usr_len
);
979 reinit_completion(&req
->inv_comp
);
982 static struct rtrs_clt_io_req
*
983 rtrs_clt_get_req(struct rtrs_clt_path
*clt_path
,
984 void (*conf
)(void *priv
, int errno
),
985 struct rtrs_permit
*permit
, void *priv
,
986 const struct kvec
*vec
, size_t usr_len
,
987 struct scatterlist
*sg
, size_t sg_cnt
,
988 size_t data_len
, int dir
)
990 struct rtrs_clt_io_req
*req
;
992 req
= &clt_path
->reqs
[permit
->mem_id
];
993 rtrs_clt_init_req(req
, clt_path
, conf
, permit
, priv
, vec
, usr_len
,
994 sg
, sg_cnt
, data_len
, dir
);
998 static struct rtrs_clt_io_req
*
999 rtrs_clt_get_copy_req(struct rtrs_clt_path
*alive_path
,
1000 struct rtrs_clt_io_req
*fail_req
)
1002 struct rtrs_clt_io_req
*req
;
1004 .iov_base
= fail_req
->iu
->buf
,
1005 .iov_len
= fail_req
->usr_len
1008 req
= &alive_path
->reqs
[fail_req
->permit
->mem_id
];
1009 rtrs_clt_init_req(req
, alive_path
, fail_req
->conf
, fail_req
->permit
,
1010 fail_req
->priv
, &vec
, fail_req
->usr_len
,
1011 fail_req
->sglist
, fail_req
->sg_cnt
,
1012 fail_req
->data_len
, fail_req
->dir
);
1016 static int rtrs_post_rdma_write_sg(struct rtrs_clt_con
*con
,
1017 struct rtrs_clt_io_req
*req
,
1018 struct rtrs_rbuf
*rbuf
, bool fr_en
,
1019 u32 count
, u32 size
, u32 imm
,
1020 struct ib_send_wr
*wr
,
1021 struct ib_send_wr
*tail
)
1023 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
1024 struct ib_sge
*sge
= req
->sge
;
1025 enum ib_send_flags flags
;
1026 struct scatterlist
*sg
;
1029 struct ib_send_wr
*ptail
= NULL
;
1033 sge
[i
].addr
= req
->mr
->iova
;
1034 sge
[i
].length
= req
->mr
->length
;
1035 sge
[i
].lkey
= req
->mr
->lkey
;
1040 for_each_sg(req
->sglist
, sg
, count
, i
) {
1041 sge
[i
].addr
= sg_dma_address(sg
);
1042 sge
[i
].length
= sg_dma_len(sg
);
1043 sge
[i
].lkey
= clt_path
->s
.dev
->ib_pd
->local_dma_lkey
;
1045 num_sge
= 1 + count
;
1047 sge
[i
].addr
= req
->iu
->dma_addr
;
1048 sge
[i
].length
= size
;
1049 sge
[i
].lkey
= clt_path
->s
.dev
->ib_pd
->local_dma_lkey
;
1052 * From time to time we have to post signalled sends,
1053 * or send queue will fill up and only QP reset can help.
1055 flags
= atomic_inc_return(&con
->c
.wr_cnt
) % clt_path
->s
.signal_interval
?
1056 0 : IB_SEND_SIGNALED
;
1058 ib_dma_sync_single_for_device(clt_path
->s
.dev
->ib_dev
,
1060 size
, DMA_TO_DEVICE
);
1062 return rtrs_iu_post_rdma_write_imm(&con
->c
, req
->iu
, sge
, num_sge
,
1063 rbuf
->rkey
, rbuf
->addr
, imm
,
1067 static int rtrs_map_sg_fr(struct rtrs_clt_io_req
*req
, size_t count
)
1071 /* Align the MR to a 4K page size to match the block virt boundary */
1072 nr
= ib_map_mr_sg(req
->mr
, req
->sglist
, count
, NULL
, SZ_4K
);
1074 return nr
< 0 ? nr
: -EINVAL
;
1075 ib_update_fast_reg_key(req
->mr
, ib_inc_rkey(req
->mr
->rkey
));
1080 static int rtrs_clt_write_req(struct rtrs_clt_io_req
*req
)
1082 struct rtrs_clt_con
*con
= req
->con
;
1083 struct rtrs_path
*s
= con
->c
.path
;
1084 struct rtrs_clt_path
*clt_path
= to_clt_path(s
);
1085 struct rtrs_msg_rdma_write
*msg
;
1087 struct rtrs_rbuf
*rbuf
;
1090 struct ib_reg_wr rwr
;
1091 struct ib_send_wr
*wr
= NULL
;
1094 const size_t tsize
= sizeof(*msg
) + req
->data_len
+ req
->usr_len
;
1096 if (tsize
> clt_path
->chunk_size
) {
1097 rtrs_wrn(s
, "Write request failed, size too big %zu > %d\n",
1098 tsize
, clt_path
->chunk_size
);
1102 count
= ib_dma_map_sg(clt_path
->s
.dev
->ib_dev
, req
->sglist
,
1103 req
->sg_cnt
, req
->dir
);
1105 rtrs_wrn(s
, "Write request failed, map failed\n");
1109 /* put rtrs msg after sg and user message */
1110 msg
= req
->iu
->buf
+ req
->usr_len
;
1111 msg
->type
= cpu_to_le16(RTRS_MSG_WRITE
);
1112 msg
->usr_len
= cpu_to_le16(req
->usr_len
);
1114 /* rtrs message on server side will be after user data and message */
1115 imm
= req
->permit
->mem_off
+ req
->data_len
+ req
->usr_len
;
1116 imm
= rtrs_to_io_req_imm(imm
);
1117 buf_id
= req
->permit
->mem_id
;
1118 req
->sg_size
= tsize
;
1119 rbuf
= &clt_path
->rbufs
[buf_id
];
1122 ret
= rtrs_map_sg_fr(req
, count
);
1125 "Write request failed, failed to map fast reg. data, err: %d\n",
1127 ib_dma_unmap_sg(clt_path
->s
.dev
->ib_dev
, req
->sglist
,
1128 req
->sg_cnt
, req
->dir
);
1131 rwr
= (struct ib_reg_wr
) {
1132 .wr
.opcode
= IB_WR_REG_MR
,
1133 .wr
.wr_cqe
= &fast_reg_cqe
,
1135 .key
= req
->mr
->rkey
,
1136 .access
= (IB_ACCESS_LOCAL_WRITE
),
1140 req
->mr
->need_inval
= true;
1143 * Update stats now, after request is successfully sent it is not
1144 * safe anymore to touch it.
1146 rtrs_clt_update_all_stats(req
, WRITE
);
1148 ret
= rtrs_post_rdma_write_sg(req
->con
, req
, rbuf
, fr_en
, count
,
1149 req
->usr_len
+ sizeof(*msg
),
1153 "Write request failed: error=%d path=%s [%s:%u]\n",
1154 ret
, kobject_name(&clt_path
->kobj
), clt_path
->hca_name
,
1155 clt_path
->hca_port
);
1156 if (req
->mp_policy
== MP_POLICY_MIN_INFLIGHT
)
1157 atomic_dec(&clt_path
->stats
->inflight
);
1158 if (req
->mr
->need_inval
) {
1159 req
->mr
->need_inval
= false;
1160 refcount_dec(&req
->ref
);
1163 ib_dma_unmap_sg(clt_path
->s
.dev
->ib_dev
, req
->sglist
,
1164 req
->sg_cnt
, req
->dir
);
1170 static int rtrs_clt_read_req(struct rtrs_clt_io_req
*req
)
1172 struct rtrs_clt_con
*con
= req
->con
;
1173 struct rtrs_path
*s
= con
->c
.path
;
1174 struct rtrs_clt_path
*clt_path
= to_clt_path(s
);
1175 struct rtrs_msg_rdma_read
*msg
;
1176 struct rtrs_ib_dev
*dev
= clt_path
->s
.dev
;
1178 struct ib_reg_wr rwr
;
1179 struct ib_send_wr
*wr
= NULL
;
1184 const size_t tsize
= sizeof(*msg
) + req
->data_len
+ req
->usr_len
;
1186 if (tsize
> clt_path
->chunk_size
) {
1188 "Read request failed, message size is %zu, bigger than CHUNK_SIZE %d\n",
1189 tsize
, clt_path
->chunk_size
);
1194 count
= ib_dma_map_sg(dev
->ib_dev
, req
->sglist
, req
->sg_cnt
,
1198 "Read request failed, dma map failed\n");
1202 /* put our message into req->buf after user message*/
1203 msg
= req
->iu
->buf
+ req
->usr_len
;
1204 msg
->type
= cpu_to_le16(RTRS_MSG_READ
);
1205 msg
->usr_len
= cpu_to_le16(req
->usr_len
);
1208 ret
= rtrs_map_sg_fr(req
, count
);
1211 "Read request failed, failed to map fast reg. data, err: %d\n",
1213 ib_dma_unmap_sg(dev
->ib_dev
, req
->sglist
, req
->sg_cnt
,
1217 rwr
= (struct ib_reg_wr
) {
1218 .wr
.opcode
= IB_WR_REG_MR
,
1219 .wr
.wr_cqe
= &fast_reg_cqe
,
1221 .key
= req
->mr
->rkey
,
1222 .access
= (IB_ACCESS_LOCAL_WRITE
|
1223 IB_ACCESS_REMOTE_WRITE
),
1227 msg
->sg_cnt
= cpu_to_le16(1);
1228 msg
->flags
= cpu_to_le16(RTRS_MSG_NEED_INVAL_F
);
1230 msg
->desc
[0].addr
= cpu_to_le64(req
->mr
->iova
);
1231 msg
->desc
[0].key
= cpu_to_le32(req
->mr
->rkey
);
1232 msg
->desc
[0].len
= cpu_to_le32(req
->mr
->length
);
1234 /* Further invalidation is required */
1235 req
->mr
->need_inval
= !!RTRS_MSG_NEED_INVAL_F
;
1242 * rtrs message will be after the space reserved for disk data and
1245 imm
= req
->permit
->mem_off
+ req
->data_len
+ req
->usr_len
;
1246 imm
= rtrs_to_io_req_imm(imm
);
1247 buf_id
= req
->permit
->mem_id
;
1249 req
->sg_size
= sizeof(*msg
);
1250 req
->sg_size
+= le16_to_cpu(msg
->sg_cnt
) * sizeof(struct rtrs_sg_desc
);
1251 req
->sg_size
+= req
->usr_len
;
1254 * Update stats now, after request is successfully sent it is not
1255 * safe anymore to touch it.
1257 rtrs_clt_update_all_stats(req
, READ
);
1259 ret
= rtrs_post_send_rdma(req
->con
, req
, &clt_path
->rbufs
[buf_id
],
1260 req
->data_len
, imm
, wr
);
1263 "Read request failed: error=%d path=%s [%s:%u]\n",
1264 ret
, kobject_name(&clt_path
->kobj
), clt_path
->hca_name
,
1265 clt_path
->hca_port
);
1266 if (req
->mp_policy
== MP_POLICY_MIN_INFLIGHT
)
1267 atomic_dec(&clt_path
->stats
->inflight
);
1268 req
->mr
->need_inval
= false;
1270 ib_dma_unmap_sg(dev
->ib_dev
, req
->sglist
,
1271 req
->sg_cnt
, req
->dir
);
1278 * rtrs_clt_failover_req() - Try to find an active path for a failed request
1280 * @fail_req: a failed io request.
1282 static int rtrs_clt_failover_req(struct rtrs_clt_sess
*clt
,
1283 struct rtrs_clt_io_req
*fail_req
)
1285 struct rtrs_clt_path
*alive_path
;
1286 struct rtrs_clt_io_req
*req
;
1287 int err
= -ECONNABORTED
;
1291 for (path_it_init(&it
, clt
);
1292 (alive_path
= it
.next_path(&it
)) && it
.i
< it
.clt
->paths_num
;
1294 if (READ_ONCE(alive_path
->state
) != RTRS_CLT_CONNECTED
)
1296 req
= rtrs_clt_get_copy_req(alive_path
, fail_req
);
1297 if (req
->dir
== DMA_TO_DEVICE
)
1298 err
= rtrs_clt_write_req(req
);
1300 err
= rtrs_clt_read_req(req
);
1302 req
->in_use
= false;
1306 rtrs_clt_inc_failover_cnt(alive_path
->stats
);
1309 path_it_deinit(&it
);
1315 static void fail_all_outstanding_reqs(struct rtrs_clt_path
*clt_path
)
1317 struct rtrs_clt_sess
*clt
= clt_path
->clt
;
1318 struct rtrs_clt_io_req
*req
;
1321 if (!clt_path
->reqs
)
1323 for (i
= 0; i
< clt_path
->queue_depth
; ++i
) {
1324 req
= &clt_path
->reqs
[i
];
1329 * Safely (without notification) complete failed request.
1330 * After completion this request is still useble and can
1331 * be failovered to another path.
1333 complete_rdma_req(req
, -ECONNABORTED
, false, true);
1335 err
= rtrs_clt_failover_req(clt
, req
);
1337 /* Failover failed, notify anyway */
1338 req
->conf(req
->priv
, err
);
1342 static void free_path_reqs(struct rtrs_clt_path
*clt_path
)
1344 struct rtrs_clt_io_req
*req
;
1347 if (!clt_path
->reqs
)
1349 for (i
= 0; i
< clt_path
->queue_depth
; ++i
) {
1350 req
= &clt_path
->reqs
[i
];
1352 ib_dereg_mr(req
->mr
);
1354 rtrs_iu_free(req
->iu
, clt_path
->s
.dev
->ib_dev
, 1);
1356 kfree(clt_path
->reqs
);
1357 clt_path
->reqs
= NULL
;
1360 static int alloc_path_reqs(struct rtrs_clt_path
*clt_path
)
1362 struct rtrs_clt_io_req
*req
;
1363 int i
, err
= -ENOMEM
;
1365 clt_path
->reqs
= kcalloc(clt_path
->queue_depth
,
1366 sizeof(*clt_path
->reqs
),
1368 if (!clt_path
->reqs
)
1371 for (i
= 0; i
< clt_path
->queue_depth
; ++i
) {
1372 req
= &clt_path
->reqs
[i
];
1373 req
->iu
= rtrs_iu_alloc(1, clt_path
->max_hdr_size
, GFP_KERNEL
,
1374 clt_path
->s
.dev
->ib_dev
,
1376 rtrs_clt_rdma_done
);
1380 req
->sge
= kcalloc(2, sizeof(*req
->sge
), GFP_KERNEL
);
1384 req
->mr
= ib_alloc_mr(clt_path
->s
.dev
->ib_pd
,
1386 clt_path
->max_pages_per_mr
);
1387 if (IS_ERR(req
->mr
)) {
1388 err
= PTR_ERR(req
->mr
);
1389 pr_err("Failed to alloc clt_path->max_pages_per_mr %d: %pe\n",
1390 clt_path
->max_pages_per_mr
, req
->mr
);
1395 init_completion(&req
->inv_comp
);
1401 free_path_reqs(clt_path
);
1406 static int alloc_permits(struct rtrs_clt_sess
*clt
)
1408 unsigned int chunk_bits
;
1411 clt
->permits_map
= bitmap_zalloc(clt
->queue_depth
, GFP_KERNEL
);
1412 if (!clt
->permits_map
) {
1416 clt
->permits
= kcalloc(clt
->queue_depth
, permit_size(clt
), GFP_KERNEL
);
1417 if (!clt
->permits
) {
1421 chunk_bits
= ilog2(clt
->queue_depth
- 1) + 1;
1422 for (i
= 0; i
< clt
->queue_depth
; i
++) {
1423 struct rtrs_permit
*permit
;
1425 permit
= get_permit(clt
, i
);
1427 permit
->mem_off
= i
<< (MAX_IMM_PAYL_BITS
- chunk_bits
);
1433 bitmap_free(clt
->permits_map
);
1434 clt
->permits_map
= NULL
;
1439 static void free_permits(struct rtrs_clt_sess
*clt
)
1441 if (clt
->permits_map
)
1442 wait_event(clt
->permits_wait
,
1443 bitmap_empty(clt
->permits_map
, clt
->queue_depth
));
1445 bitmap_free(clt
->permits_map
);
1446 clt
->permits_map
= NULL
;
1447 kfree(clt
->permits
);
1448 clt
->permits
= NULL
;
1451 static void query_fast_reg_mode(struct rtrs_clt_path
*clt_path
)
1453 struct ib_device
*ib_dev
;
1454 u64 max_pages_per_mr
;
1457 ib_dev
= clt_path
->s
.dev
->ib_dev
;
1460 * Use the smallest page size supported by the HCA, down to a
1461 * minimum of 4096 bytes. We're unlikely to build large sglists
1462 * out of smaller entries.
1464 mr_page_shift
= max(12, ffs(ib_dev
->attrs
.page_size_cap
) - 1);
1465 max_pages_per_mr
= ib_dev
->attrs
.max_mr_size
;
1466 do_div(max_pages_per_mr
, (1ull << mr_page_shift
));
1467 clt_path
->max_pages_per_mr
=
1468 min3(clt_path
->max_pages_per_mr
, (u32
)max_pages_per_mr
,
1469 ib_dev
->attrs
.max_fast_reg_page_list_len
);
1470 clt_path
->clt
->max_segments
=
1471 min(clt_path
->max_pages_per_mr
, clt_path
->clt
->max_segments
);
1474 static bool rtrs_clt_change_state_get_old(struct rtrs_clt_path
*clt_path
,
1475 enum rtrs_clt_state new_state
,
1476 enum rtrs_clt_state
*old_state
)
1480 spin_lock_irq(&clt_path
->state_wq
.lock
);
1482 *old_state
= clt_path
->state
;
1483 changed
= rtrs_clt_change_state(clt_path
, new_state
);
1484 spin_unlock_irq(&clt_path
->state_wq
.lock
);
1489 static void rtrs_clt_hb_err_handler(struct rtrs_con
*c
)
1491 struct rtrs_clt_con
*con
= container_of(c
, typeof(*con
), c
);
1492 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
1494 rtrs_err(con
->c
.path
, "HB err handler for path=%s\n", kobject_name(&clt_path
->kobj
));
1495 rtrs_rdma_error_recovery(con
);
1498 static void rtrs_clt_init_hb(struct rtrs_clt_path
*clt_path
)
1500 rtrs_init_hb(&clt_path
->s
, &io_comp_cqe
,
1501 RTRS_HB_INTERVAL_MS
,
1503 rtrs_clt_hb_err_handler
,
1507 static void rtrs_clt_reconnect_work(struct work_struct
*work
);
1508 static void rtrs_clt_close_work(struct work_struct
*work
);
1510 static void rtrs_clt_err_recovery_work(struct work_struct
*work
)
1512 struct rtrs_clt_path
*clt_path
;
1513 struct rtrs_clt_sess
*clt
;
1516 clt_path
= container_of(work
, struct rtrs_clt_path
, err_recovery_work
);
1517 clt
= clt_path
->clt
;
1518 delay_ms
= clt
->reconnect_delay_sec
* 1000;
1519 rtrs_clt_stop_and_destroy_conns(clt_path
);
1520 queue_delayed_work(rtrs_wq
, &clt_path
->reconnect_dwork
,
1521 msecs_to_jiffies(delay_ms
+
1522 get_random_u32_below(RTRS_RECONNECT_SEED
)));
1525 static struct rtrs_clt_path
*alloc_path(struct rtrs_clt_sess
*clt
,
1526 const struct rtrs_addr
*path
,
1527 size_t con_num
, u32 nr_poll_queues
)
1529 struct rtrs_clt_path
*clt_path
;
1534 clt_path
= kzalloc(sizeof(*clt_path
), GFP_KERNEL
);
1540 * +1: Extra connection for user messages
1542 total_con
= con_num
+ nr_poll_queues
+ 1;
1543 clt_path
->s
.con
= kcalloc(total_con
, sizeof(*clt_path
->s
.con
),
1545 if (!clt_path
->s
.con
)
1548 clt_path
->s
.con_num
= total_con
;
1549 clt_path
->s
.irq_con_num
= con_num
+ 1;
1551 clt_path
->stats
= kzalloc(sizeof(*clt_path
->stats
), GFP_KERNEL
);
1552 if (!clt_path
->stats
)
1555 mutex_init(&clt_path
->init_mutex
);
1556 uuid_gen(&clt_path
->s
.uuid
);
1557 memcpy(&clt_path
->s
.dst_addr
, path
->dst
,
1558 rdma_addr_size((struct sockaddr
*)path
->dst
));
1561 * rdma_resolve_addr() passes src_addr to cma_bind_addr, which
1562 * checks the sa_family to be non-zero. If user passed src_addr=NULL
1563 * the sess->src_addr will contain only zeros, which is then fine.
1566 memcpy(&clt_path
->s
.src_addr
, path
->src
,
1567 rdma_addr_size((struct sockaddr
*)path
->src
));
1568 strscpy(clt_path
->s
.sessname
, clt
->sessname
,
1569 sizeof(clt_path
->s
.sessname
));
1570 clt_path
->clt
= clt
;
1571 clt_path
->max_pages_per_mr
= RTRS_MAX_SEGMENTS
;
1572 init_waitqueue_head(&clt_path
->state_wq
);
1573 clt_path
->state
= RTRS_CLT_CONNECTING
;
1574 atomic_set(&clt_path
->connected_cnt
, 0);
1575 INIT_WORK(&clt_path
->close_work
, rtrs_clt_close_work
);
1576 INIT_WORK(&clt_path
->err_recovery_work
, rtrs_clt_err_recovery_work
);
1577 INIT_DELAYED_WORK(&clt_path
->reconnect_dwork
, rtrs_clt_reconnect_work
);
1578 rtrs_clt_init_hb(clt_path
);
1580 clt_path
->mp_skip_entry
= alloc_percpu(typeof(*clt_path
->mp_skip_entry
));
1581 if (!clt_path
->mp_skip_entry
)
1582 goto err_free_stats
;
1584 for_each_possible_cpu(cpu
)
1585 INIT_LIST_HEAD(per_cpu_ptr(clt_path
->mp_skip_entry
, cpu
));
1587 err
= rtrs_clt_init_stats(clt_path
->stats
);
1589 goto err_free_percpu
;
1594 free_percpu(clt_path
->mp_skip_entry
);
1596 kfree(clt_path
->stats
);
1598 kfree(clt_path
->s
.con
);
1602 return ERR_PTR(err
);
1605 void free_path(struct rtrs_clt_path
*clt_path
)
1607 free_percpu(clt_path
->mp_skip_entry
);
1608 mutex_destroy(&clt_path
->init_mutex
);
1609 kfree(clt_path
->s
.con
);
1610 kfree(clt_path
->rbufs
);
1614 static int create_con(struct rtrs_clt_path
*clt_path
, unsigned int cid
)
1616 struct rtrs_clt_con
*con
;
1618 con
= kzalloc(sizeof(*con
), GFP_KERNEL
);
1622 /* Map first two connections to the first CPU */
1623 con
->cpu
= (cid
? cid
- 1 : 0) % nr_cpu_ids
;
1625 con
->c
.path
= &clt_path
->s
;
1626 /* Align with srv, init as 1 */
1627 atomic_set(&con
->c
.wr_cnt
, 1);
1628 mutex_init(&con
->con_mutex
);
1630 clt_path
->s
.con
[cid
] = &con
->c
;
1635 static void destroy_con(struct rtrs_clt_con
*con
)
1637 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
1639 clt_path
->s
.con
[con
->c
.cid
] = NULL
;
1640 mutex_destroy(&con
->con_mutex
);
1644 static int create_con_cq_qp(struct rtrs_clt_con
*con
)
1646 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
1647 u32 max_send_wr
, max_recv_wr
, cq_num
, max_send_sge
, wr_limit
;
1649 struct rtrs_msg_rkey_rsp
*rsp
;
1651 lockdep_assert_held(&con
->con_mutex
);
1652 if (con
->c
.cid
== 0) {
1654 /* We must be the first here */
1655 if (WARN_ON(clt_path
->s
.dev
))
1659 * The whole session uses device from user connection.
1660 * Be careful not to close user connection before ib dev
1661 * is gracefully put.
1663 clt_path
->s
.dev
= rtrs_ib_dev_find_or_add(con
->c
.cm_id
->device
,
1665 if (!clt_path
->s
.dev
) {
1666 rtrs_wrn(clt_path
->clt
,
1667 "rtrs_ib_dev_find_get_or_add(): no memory\n");
1670 clt_path
->s
.dev_ref
= 1;
1671 query_fast_reg_mode(clt_path
);
1672 wr_limit
= clt_path
->s
.dev
->ib_dev
->attrs
.max_qp_wr
;
1674 * Two (request + registration) completion for send
1675 * Two for recv if always_invalidate is set on server
1677 * + 2 for drain and heartbeat
1678 * in case qp gets into error state.
1681 min_t(int, wr_limit
, SERVICE_CON_QUEUE_DEPTH
* 2 + 2);
1682 max_recv_wr
= max_send_wr
;
1685 * Here we assume that session members are correctly set.
1686 * This is always true if user connection (cid == 0) is
1687 * established first.
1689 if (WARN_ON(!clt_path
->s
.dev
))
1691 if (WARN_ON(!clt_path
->queue_depth
))
1694 wr_limit
= clt_path
->s
.dev
->ib_dev
->attrs
.max_qp_wr
;
1695 /* Shared between connections */
1696 clt_path
->s
.dev_ref
++;
1697 max_send_wr
= min_t(int, wr_limit
,
1698 /* QD * (REQ + RSP + FR REGS or INVS) + drain */
1699 clt_path
->queue_depth
* 4 + 1);
1700 max_recv_wr
= min_t(int, wr_limit
,
1701 clt_path
->queue_depth
* 3 + 1);
1704 atomic_set(&con
->c
.sq_wr_avail
, max_send_wr
);
1705 cq_num
= max_send_wr
+ max_recv_wr
;
1706 /* alloc iu to recv new rkey reply when server reports flags set */
1707 if (clt_path
->flags
& RTRS_MSG_NEW_RKEY_F
|| con
->c
.cid
== 0) {
1708 con
->rsp_ius
= rtrs_iu_alloc(cq_num
, sizeof(*rsp
),
1710 clt_path
->s
.dev
->ib_dev
,
1712 rtrs_clt_rdma_done
);
1715 con
->queue_num
= cq_num
;
1717 cq_vector
= con
->cpu
% clt_path
->s
.dev
->ib_dev
->num_comp_vectors
;
1718 if (con
->c
.cid
>= clt_path
->s
.irq_con_num
)
1719 err
= rtrs_cq_qp_create(&clt_path
->s
, &con
->c
, max_send_sge
,
1720 cq_vector
, cq_num
, max_send_wr
,
1721 max_recv_wr
, IB_POLL_DIRECT
);
1723 err
= rtrs_cq_qp_create(&clt_path
->s
, &con
->c
, max_send_sge
,
1724 cq_vector
, cq_num
, max_send_wr
,
1725 max_recv_wr
, IB_POLL_SOFTIRQ
);
1727 * In case of error we do not bother to clean previous allocations,
1728 * since destroy_con_cq_qp() must be called.
1733 static void destroy_con_cq_qp(struct rtrs_clt_con
*con
)
1735 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
1738 * Be careful here: destroy_con_cq_qp() can be called even
1739 * create_con_cq_qp() failed, see comments there.
1741 lockdep_assert_held(&con
->con_mutex
);
1742 rtrs_cq_qp_destroy(&con
->c
);
1744 rtrs_iu_free(con
->rsp_ius
, clt_path
->s
.dev
->ib_dev
,
1746 con
->rsp_ius
= NULL
;
1749 if (clt_path
->s
.dev_ref
&& !--clt_path
->s
.dev_ref
) {
1750 rtrs_ib_dev_put(clt_path
->s
.dev
);
1751 clt_path
->s
.dev
= NULL
;
1755 static void stop_cm(struct rtrs_clt_con
*con
)
1757 rdma_disconnect(con
->c
.cm_id
);
1759 ib_drain_qp(con
->c
.qp
);
1762 static void destroy_cm(struct rtrs_clt_con
*con
)
1764 rdma_destroy_id(con
->c
.cm_id
);
1765 con
->c
.cm_id
= NULL
;
1768 static int rtrs_rdma_addr_resolved(struct rtrs_clt_con
*con
)
1770 struct rtrs_path
*s
= con
->c
.path
;
1773 mutex_lock(&con
->con_mutex
);
1774 err
= create_con_cq_qp(con
);
1775 mutex_unlock(&con
->con_mutex
);
1777 rtrs_err(s
, "create_con_cq_qp(), err: %d\n", err
);
1780 err
= rdma_resolve_route(con
->c
.cm_id
, RTRS_CONNECT_TIMEOUT_MS
);
1782 rtrs_err(s
, "Resolving route failed, err: %d\n", err
);
1787 static int rtrs_rdma_route_resolved(struct rtrs_clt_con
*con
)
1789 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
1790 struct rtrs_clt_sess
*clt
= clt_path
->clt
;
1791 struct rtrs_msg_conn_req msg
;
1792 struct rdma_conn_param param
;
1796 param
= (struct rdma_conn_param
) {
1798 .rnr_retry_count
= 7,
1799 .private_data
= &msg
,
1800 .private_data_len
= sizeof(msg
),
1803 msg
= (struct rtrs_msg_conn_req
) {
1804 .magic
= cpu_to_le16(RTRS_MAGIC
),
1805 .version
= cpu_to_le16(RTRS_PROTO_VER
),
1806 .cid
= cpu_to_le16(con
->c
.cid
),
1807 .cid_num
= cpu_to_le16(clt_path
->s
.con_num
),
1808 .recon_cnt
= cpu_to_le16(clt_path
->s
.recon_cnt
),
1810 msg
.first_conn
= clt_path
->for_new_clt
? FIRST_CONN
: 0;
1811 uuid_copy(&msg
.sess_uuid
, &clt_path
->s
.uuid
);
1812 uuid_copy(&msg
.paths_uuid
, &clt
->paths_uuid
);
1814 err
= rdma_connect_locked(con
->c
.cm_id
, ¶m
);
1816 rtrs_err(clt
, "rdma_connect_locked(): %d\n", err
);
1821 static int rtrs_rdma_conn_established(struct rtrs_clt_con
*con
,
1822 struct rdma_cm_event
*ev
)
1824 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
1825 struct rtrs_clt_sess
*clt
= clt_path
->clt
;
1826 const struct rtrs_msg_conn_rsp
*msg
;
1827 u16 version
, queue_depth
;
1831 msg
= ev
->param
.conn
.private_data
;
1832 len
= ev
->param
.conn
.private_data_len
;
1833 if (len
< sizeof(*msg
)) {
1834 rtrs_err(clt
, "Invalid RTRS connection response\n");
1837 if (le16_to_cpu(msg
->magic
) != RTRS_MAGIC
) {
1838 rtrs_err(clt
, "Invalid RTRS magic\n");
1841 version
= le16_to_cpu(msg
->version
);
1842 if (version
>> 8 != RTRS_PROTO_VER_MAJOR
) {
1843 rtrs_err(clt
, "Unsupported major RTRS version: %d, expected %d\n",
1844 version
>> 8, RTRS_PROTO_VER_MAJOR
);
1847 errno
= le16_to_cpu(msg
->errno
);
1849 rtrs_err(clt
, "Invalid RTRS message: errno %d\n",
1853 if (con
->c
.cid
== 0) {
1854 queue_depth
= le16_to_cpu(msg
->queue_depth
);
1856 if (clt_path
->queue_depth
> 0 && queue_depth
!= clt_path
->queue_depth
) {
1857 rtrs_err(clt
, "Error: queue depth changed\n");
1860 * Stop any more reconnection attempts
1862 clt_path
->reconnect_attempts
= -1;
1864 "Disabling auto-reconnect. Trigger a manual reconnect after issue is resolved\n");
1868 if (!clt_path
->rbufs
) {
1869 clt_path
->rbufs
= kcalloc(queue_depth
,
1870 sizeof(*clt_path
->rbufs
),
1872 if (!clt_path
->rbufs
)
1875 clt_path
->queue_depth
= queue_depth
;
1876 clt_path
->s
.signal_interval
= min_not_zero(queue_depth
,
1877 (unsigned short) SERVICE_CON_QUEUE_DEPTH
);
1878 clt_path
->max_hdr_size
= le32_to_cpu(msg
->max_hdr_size
);
1879 clt_path
->max_io_size
= le32_to_cpu(msg
->max_io_size
);
1880 clt_path
->flags
= le32_to_cpu(msg
->flags
);
1881 clt_path
->chunk_size
= clt_path
->max_io_size
+ clt_path
->max_hdr_size
;
1884 * Global IO size is always a minimum.
1885 * If while a reconnection server sends us a value a bit
1886 * higher - client does not care and uses cached minimum.
1888 * Since we can have several sessions (paths) restablishing
1889 * connections in parallel, use lock.
1891 mutex_lock(&clt
->paths_mutex
);
1892 clt
->queue_depth
= clt_path
->queue_depth
;
1893 clt
->max_io_size
= min_not_zero(clt_path
->max_io_size
,
1895 mutex_unlock(&clt
->paths_mutex
);
1898 * Cache the hca_port and hca_name for sysfs
1900 clt_path
->hca_port
= con
->c
.cm_id
->port_num
;
1901 scnprintf(clt_path
->hca_name
, sizeof(clt_path
->hca_name
),
1902 clt_path
->s
.dev
->ib_dev
->name
);
1903 clt_path
->s
.src_addr
= con
->c
.cm_id
->route
.addr
.src_addr
;
1904 /* set for_new_clt, to allow future reconnect on any path */
1905 clt_path
->for_new_clt
= 1;
1911 static inline void flag_success_on_conn(struct rtrs_clt_con
*con
)
1913 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
1915 atomic_inc(&clt_path
->connected_cnt
);
1919 static int rtrs_rdma_conn_rejected(struct rtrs_clt_con
*con
,
1920 struct rdma_cm_event
*ev
)
1922 struct rtrs_path
*s
= con
->c
.path
;
1923 const struct rtrs_msg_conn_rsp
*msg
;
1924 const char *rej_msg
;
1928 status
= ev
->status
;
1929 rej_msg
= rdma_reject_msg(con
->c
.cm_id
, status
);
1930 msg
= rdma_consumer_reject_data(con
->c
.cm_id
, ev
, &data_len
);
1932 if (msg
&& data_len
>= sizeof(*msg
)) {
1933 errno
= (int16_t)le16_to_cpu(msg
->errno
);
1934 if (errno
== -EBUSY
)
1936 "Previous session is still exists on the server, please reconnect later\n");
1939 "Connect rejected: status %d (%s), rtrs errno %d\n",
1940 status
, rej_msg
, errno
);
1943 "Connect rejected but with malformed message: status %d (%s)\n",
1950 void rtrs_clt_close_conns(struct rtrs_clt_path
*clt_path
, bool wait
)
1952 trace_rtrs_clt_close_conns(clt_path
);
1954 if (rtrs_clt_change_state_get_old(clt_path
, RTRS_CLT_CLOSING
, NULL
))
1955 queue_work(rtrs_wq
, &clt_path
->close_work
);
1957 flush_work(&clt_path
->close_work
);
1960 static inline void flag_error_on_conn(struct rtrs_clt_con
*con
, int cm_err
)
1962 if (con
->cm_err
== 1) {
1963 struct rtrs_clt_path
*clt_path
;
1965 clt_path
= to_clt_path(con
->c
.path
);
1966 if (atomic_dec_and_test(&clt_path
->connected_cnt
))
1968 wake_up(&clt_path
->state_wq
);
1970 con
->cm_err
= cm_err
;
1973 static int rtrs_clt_rdma_cm_handler(struct rdma_cm_id
*cm_id
,
1974 struct rdma_cm_event
*ev
)
1976 struct rtrs_clt_con
*con
= cm_id
->context
;
1977 struct rtrs_path
*s
= con
->c
.path
;
1978 struct rtrs_clt_path
*clt_path
= to_clt_path(s
);
1981 switch (ev
->event
) {
1982 case RDMA_CM_EVENT_ADDR_RESOLVED
:
1983 cm_err
= rtrs_rdma_addr_resolved(con
);
1985 case RDMA_CM_EVENT_ROUTE_RESOLVED
:
1986 cm_err
= rtrs_rdma_route_resolved(con
);
1988 case RDMA_CM_EVENT_ESTABLISHED
:
1989 cm_err
= rtrs_rdma_conn_established(con
, ev
);
1992 * Report success and wake up. Here we abuse state_wq,
1993 * i.e. wake up without state change, but we set cm_err.
1995 flag_success_on_conn(con
);
1996 wake_up(&clt_path
->state_wq
);
2000 case RDMA_CM_EVENT_REJECTED
:
2001 cm_err
= rtrs_rdma_conn_rejected(con
, ev
);
2003 case RDMA_CM_EVENT_DISCONNECTED
:
2004 /* No message for disconnecting */
2005 cm_err
= -ECONNRESET
;
2007 case RDMA_CM_EVENT_CONNECT_ERROR
:
2008 case RDMA_CM_EVENT_UNREACHABLE
:
2009 case RDMA_CM_EVENT_ADDR_CHANGE
:
2010 case RDMA_CM_EVENT_TIMEWAIT_EXIT
:
2011 rtrs_wrn(s
, "CM error (CM event: %s, err: %d)\n",
2012 rdma_event_msg(ev
->event
), ev
->status
);
2013 cm_err
= -ECONNRESET
;
2015 case RDMA_CM_EVENT_ADDR_ERROR
:
2016 case RDMA_CM_EVENT_ROUTE_ERROR
:
2017 rtrs_wrn(s
, "CM error (CM event: %s, err: %d)\n",
2018 rdma_event_msg(ev
->event
), ev
->status
);
2019 cm_err
= -EHOSTUNREACH
;
2021 case RDMA_CM_EVENT_DEVICE_REMOVAL
:
2023 * Device removal is a special case. Queue close and return 0.
2025 rtrs_wrn_rl(s
, "CM event: %s, status: %d\n", rdma_event_msg(ev
->event
),
2027 rtrs_clt_close_conns(clt_path
, false);
2030 rtrs_err(s
, "Unexpected RDMA CM error (CM event: %s, err: %d)\n",
2031 rdma_event_msg(ev
->event
), ev
->status
);
2032 cm_err
= -ECONNRESET
;
2038 * cm error makes sense only on connection establishing,
2039 * in other cases we rely on normal procedure of reconnecting.
2041 flag_error_on_conn(con
, cm_err
);
2042 rtrs_rdma_error_recovery(con
);
2048 /* The caller should do the cleanup in case of error */
2049 static int create_cm(struct rtrs_clt_con
*con
)
2051 struct rtrs_path
*s
= con
->c
.path
;
2052 struct rtrs_clt_path
*clt_path
= to_clt_path(s
);
2053 struct rdma_cm_id
*cm_id
;
2056 cm_id
= rdma_create_id(&init_net
, rtrs_clt_rdma_cm_handler
, con
,
2057 clt_path
->s
.dst_addr
.ss_family
== AF_IB
?
2058 RDMA_PS_IB
: RDMA_PS_TCP
, IB_QPT_RC
);
2059 if (IS_ERR(cm_id
)) {
2060 rtrs_err(s
, "Failed to create CM ID, err: %pe\n", cm_id
);
2061 return PTR_ERR(cm_id
);
2063 con
->c
.cm_id
= cm_id
;
2065 /* allow the port to be reused */
2066 err
= rdma_set_reuseaddr(cm_id
, 1);
2068 rtrs_err(s
, "Set address reuse failed, err: %d\n", err
);
2071 err
= rdma_resolve_addr(cm_id
, (struct sockaddr
*)&clt_path
->s
.src_addr
,
2072 (struct sockaddr
*)&clt_path
->s
.dst_addr
,
2073 RTRS_CONNECT_TIMEOUT_MS
);
2075 rtrs_err(s
, "Failed to resolve address, err: %d\n", err
);
2079 * Combine connection status and session events. This is needed
2080 * for waiting two possible cases: cm_err has something meaningful
2081 * or session state was really changed to error by device removal.
2083 err
= wait_event_interruptible_timeout(
2085 con
->cm_err
|| clt_path
->state
!= RTRS_CLT_CONNECTING
,
2086 msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS
));
2087 if (err
== 0 || err
== -ERESTARTSYS
) {
2090 /* Timedout or interrupted */
2093 if (con
->cm_err
< 0)
2095 if (READ_ONCE(clt_path
->state
) != RTRS_CLT_CONNECTING
)
2096 /* Device removal */
2097 return -ECONNABORTED
;
2102 static void rtrs_clt_path_up(struct rtrs_clt_path
*clt_path
)
2104 struct rtrs_clt_sess
*clt
= clt_path
->clt
;
2108 * We can fire RECONNECTED event only when all paths were
2109 * connected on rtrs_clt_open(), then each was disconnected
2110 * and the first one connected again. That's why this nasty
2111 * game with counter value.
2114 mutex_lock(&clt
->paths_ev_mutex
);
2115 up
= ++clt
->paths_up
;
2117 * Here it is safe to access paths num directly since up counter
2118 * is greater than MAX_PATHS_NUM only while rtrs_clt_open() is
2119 * in progress, thus paths removals are impossible.
2121 if (up
> MAX_PATHS_NUM
&& up
== MAX_PATHS_NUM
+ clt
->paths_num
)
2122 clt
->paths_up
= clt
->paths_num
;
2124 clt
->link_ev(clt
->priv
, RTRS_CLT_LINK_EV_RECONNECTED
);
2125 mutex_unlock(&clt
->paths_ev_mutex
);
2127 /* Mark session as established */
2128 clt_path
->established
= true;
2129 clt_path
->reconnect_attempts
= 0;
2130 clt_path
->stats
->reconnects
.successful_cnt
++;
2133 static void rtrs_clt_path_down(struct rtrs_clt_path
*clt_path
)
2135 struct rtrs_clt_sess
*clt
= clt_path
->clt
;
2137 if (!clt_path
->established
)
2140 clt_path
->established
= false;
2141 mutex_lock(&clt
->paths_ev_mutex
);
2142 WARN_ON(!clt
->paths_up
);
2143 if (--clt
->paths_up
== 0)
2144 clt
->link_ev(clt
->priv
, RTRS_CLT_LINK_EV_DISCONNECTED
);
2145 mutex_unlock(&clt
->paths_ev_mutex
);
2148 static void rtrs_clt_stop_and_destroy_conns(struct rtrs_clt_path
*clt_path
)
2150 struct rtrs_clt_con
*con
;
2153 WARN_ON(READ_ONCE(clt_path
->state
) == RTRS_CLT_CONNECTED
);
2156 * Possible race with rtrs_clt_open(), when DEVICE_REMOVAL comes
2157 * exactly in between. Start destroying after it finishes.
2159 mutex_lock(&clt_path
->init_mutex
);
2160 mutex_unlock(&clt_path
->init_mutex
);
2163 * All IO paths must observe !CONNECTED state before we
2168 rtrs_stop_hb(&clt_path
->s
);
2171 * The order it utterly crucial: firstly disconnect and complete all
2172 * rdma requests with error (thus set in_use=false for requests),
2173 * then fail outstanding requests checking in_use for each, and
2174 * eventually notify upper layer about session disconnection.
2177 for (cid
= 0; cid
< clt_path
->s
.con_num
; cid
++) {
2178 if (!clt_path
->s
.con
[cid
])
2180 con
= to_clt_con(clt_path
->s
.con
[cid
]);
2183 fail_all_outstanding_reqs(clt_path
);
2184 free_path_reqs(clt_path
);
2185 rtrs_clt_path_down(clt_path
);
2188 * Wait for graceful shutdown, namely when peer side invokes
2189 * rdma_disconnect(). 'connected_cnt' is decremented only on
2190 * CM events, thus if other side had crashed and hb has detected
2191 * something is wrong, here we will stuck for exactly timeout ms,
2192 * since CM does not fire anything. That is fine, we are not in
2195 wait_event_timeout(clt_path
->state_wq
,
2196 !atomic_read(&clt_path
->connected_cnt
),
2197 msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS
));
2199 for (cid
= 0; cid
< clt_path
->s
.con_num
; cid
++) {
2200 if (!clt_path
->s
.con
[cid
])
2202 con
= to_clt_con(clt_path
->s
.con
[cid
]);
2203 mutex_lock(&con
->con_mutex
);
2204 destroy_con_cq_qp(con
);
2205 mutex_unlock(&con
->con_mutex
);
2211 static void rtrs_clt_remove_path_from_arr(struct rtrs_clt_path
*clt_path
)
2213 struct rtrs_clt_sess
*clt
= clt_path
->clt
;
2214 struct rtrs_clt_path
*next
;
2215 bool wait_for_grace
= false;
2218 mutex_lock(&clt
->paths_mutex
);
2219 list_del_rcu(&clt_path
->s
.entry
);
2221 /* Make sure everybody observes path removal. */
2225 * At this point nobody sees @sess in the list, but still we have
2226 * dangling pointer @pcpu_path which _can_ point to @sess. Since
2227 * nobody can observe @sess in the list, we guarantee that IO path
2228 * will not assign @sess to @pcpu_path, i.e. @pcpu_path can be equal
2229 * to @sess, but can never again become @sess.
2233 * Decrement paths number only after grace period, because
2234 * caller of do_each_path() must firstly observe list without
2235 * path and only then decremented paths number.
2237 * Otherwise there can be the following situation:
2238 * o Two paths exist and IO is coming.
2239 * o One path is removed:
2241 * do_each_path(): rtrs_clt_remove_path_from_arr():
2242 * path = get_next_path()
2243 * ^^^ list_del_rcu(path)
2244 * [!CONNECTED path] clt->paths_num--
2246 * load clt->paths_num from 2 to 1
2250 * path is observed as !CONNECTED, but do_each_path() loop
2251 * ends, because expression i < clt->paths_num is false.
2256 * Get @next connection from current @sess which is going to be
2257 * removed. If @sess is the last element, then @next is NULL.
2260 next
= rtrs_clt_get_next_path_or_null(&clt
->paths_list
, clt_path
);
2264 * @pcpu paths can still point to the path which is going to be
2265 * removed, so change the pointer manually.
2267 for_each_possible_cpu(cpu
) {
2268 struct rtrs_clt_path __rcu
**ppcpu_path
;
2270 ppcpu_path
= per_cpu_ptr(clt
->pcpu_path
, cpu
);
2271 if (rcu_dereference_protected(*ppcpu_path
,
2272 lockdep_is_held(&clt
->paths_mutex
)) != clt_path
)
2274 * synchronize_rcu() was called just after deleting
2275 * entry from the list, thus IO code path cannot
2276 * change pointer back to the pointer which is going
2277 * to be removed, we are safe here.
2282 * We race with IO code path, which also changes pointer,
2283 * thus we have to be careful not to overwrite it.
2285 if (try_cmpxchg((struct rtrs_clt_path
**)ppcpu_path
, &clt_path
,
2288 * @ppcpu_path was successfully replaced with @next,
2289 * that means that someone could also pick up the
2290 * @sess and dereferencing it right now, so wait for
2291 * a grace period is required.
2293 wait_for_grace
= true;
2298 mutex_unlock(&clt
->paths_mutex
);
2301 static void rtrs_clt_add_path_to_arr(struct rtrs_clt_path
*clt_path
)
2303 struct rtrs_clt_sess
*clt
= clt_path
->clt
;
2305 mutex_lock(&clt
->paths_mutex
);
2308 list_add_tail_rcu(&clt_path
->s
.entry
, &clt
->paths_list
);
2309 mutex_unlock(&clt
->paths_mutex
);
2312 static void rtrs_clt_close_work(struct work_struct
*work
)
2314 struct rtrs_clt_path
*clt_path
;
2316 clt_path
= container_of(work
, struct rtrs_clt_path
, close_work
);
2318 cancel_work_sync(&clt_path
->err_recovery_work
);
2319 cancel_delayed_work_sync(&clt_path
->reconnect_dwork
);
2320 rtrs_clt_stop_and_destroy_conns(clt_path
);
2321 rtrs_clt_change_state_get_old(clt_path
, RTRS_CLT_CLOSED
, NULL
);
2324 static int init_conns(struct rtrs_clt_path
*clt_path
)
2330 * On every new session connections increase reconnect counter
2331 * to avoid clashes with previous sessions not yet closed
2332 * sessions on a server side.
2334 clt_path
->s
.recon_cnt
++;
2336 /* Establish all RDMA connections */
2337 for (cid
= 0; cid
< clt_path
->s
.con_num
; cid
++) {
2338 err
= create_con(clt_path
, cid
);
2342 err
= create_cm(to_clt_con(clt_path
->s
.con
[cid
]));
2348 * Set the cid to con_num - 1, since if we fail later, we want to stay in bounds.
2350 cid
= clt_path
->s
.con_num
- 1;
2352 err
= alloc_path_reqs(clt_path
);
2359 /* Make sure we do the cleanup in the order they are created */
2360 for (i
= 0; i
<= cid
; i
++) {
2361 struct rtrs_clt_con
*con
;
2363 if (!clt_path
->s
.con
[i
])
2366 con
= to_clt_con(clt_path
->s
.con
[i
]);
2369 mutex_lock(&con
->con_mutex
);
2370 destroy_con_cq_qp(con
);
2371 mutex_unlock(&con
->con_mutex
);
2377 * If we've never taken async path and got an error, say,
2378 * doing rdma_resolve_addr(), switch to CONNECTION_ERR state
2379 * manually to keep reconnecting.
2381 rtrs_clt_change_state_get_old(clt_path
, RTRS_CLT_CONNECTING_ERR
, NULL
);
2386 static void rtrs_clt_info_req_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
2388 struct rtrs_clt_con
*con
= to_clt_con(wc
->qp
->qp_context
);
2389 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
2392 iu
= container_of(wc
->wr_cqe
, struct rtrs_iu
, cqe
);
2393 rtrs_iu_free(iu
, clt_path
->s
.dev
->ib_dev
, 1);
2395 if (wc
->status
!= IB_WC_SUCCESS
) {
2396 rtrs_err(clt_path
->clt
, "Path info request send failed: %s\n",
2397 ib_wc_status_msg(wc
->status
));
2398 rtrs_clt_change_state_get_old(clt_path
, RTRS_CLT_CONNECTING_ERR
, NULL
);
2402 rtrs_clt_update_wc_stats(con
);
2405 static int process_info_rsp(struct rtrs_clt_path
*clt_path
,
2406 const struct rtrs_msg_info_rsp
*msg
)
2408 unsigned int sg_cnt
, total_len
;
2411 sg_cnt
= le16_to_cpu(msg
->sg_cnt
);
2412 if (!sg_cnt
|| (clt_path
->queue_depth
% sg_cnt
)) {
2413 rtrs_err(clt_path
->clt
,
2414 "Incorrect sg_cnt %d, is not multiple\n",
2420 * Check if IB immediate data size is enough to hold the mem_id and
2421 * the offset inside the memory chunk.
2423 if ((ilog2(sg_cnt
- 1) + 1) + (ilog2(clt_path
->chunk_size
- 1) + 1) >
2424 MAX_IMM_PAYL_BITS
) {
2425 rtrs_err(clt_path
->clt
,
2426 "RDMA immediate size (%db) not enough to encode %d buffers of size %dB\n",
2427 MAX_IMM_PAYL_BITS
, sg_cnt
, clt_path
->chunk_size
);
2431 for (sgi
= 0, i
= 0; sgi
< sg_cnt
&& i
< clt_path
->queue_depth
; sgi
++) {
2432 const struct rtrs_sg_desc
*desc
= &msg
->desc
[sgi
];
2436 addr
= le64_to_cpu(desc
->addr
);
2437 rkey
= le32_to_cpu(desc
->key
);
2438 len
= le32_to_cpu(desc
->len
);
2442 if (!len
|| (len
% clt_path
->chunk_size
)) {
2443 rtrs_err(clt_path
->clt
, "Incorrect [%d].len %d\n",
2448 for ( ; len
&& i
< clt_path
->queue_depth
; i
++) {
2449 clt_path
->rbufs
[i
].addr
= addr
;
2450 clt_path
->rbufs
[i
].rkey
= rkey
;
2452 len
-= clt_path
->chunk_size
;
2453 addr
+= clt_path
->chunk_size
;
2457 if (sgi
!= sg_cnt
|| i
!= clt_path
->queue_depth
) {
2458 rtrs_err(clt_path
->clt
,
2459 "Incorrect sg vector, not fully mapped\n");
2462 if (total_len
!= clt_path
->chunk_size
* clt_path
->queue_depth
) {
2463 rtrs_err(clt_path
->clt
, "Incorrect total_len %d\n", total_len
);
2470 static void rtrs_clt_info_rsp_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
2472 struct rtrs_clt_con
*con
= to_clt_con(wc
->qp
->qp_context
);
2473 struct rtrs_clt_path
*clt_path
= to_clt_path(con
->c
.path
);
2474 struct rtrs_msg_info_rsp
*msg
;
2475 enum rtrs_clt_state state
;
2480 state
= RTRS_CLT_CONNECTING_ERR
;
2482 WARN_ON(con
->c
.cid
);
2483 iu
= container_of(wc
->wr_cqe
, struct rtrs_iu
, cqe
);
2484 if (wc
->status
!= IB_WC_SUCCESS
) {
2485 rtrs_err(clt_path
->clt
, "Path info response recv failed: %s\n",
2486 ib_wc_status_msg(wc
->status
));
2489 WARN_ON(wc
->opcode
!= IB_WC_RECV
);
2491 if (wc
->byte_len
< sizeof(*msg
)) {
2492 rtrs_err(clt_path
->clt
, "Path info response is malformed: size %d\n",
2496 ib_dma_sync_single_for_cpu(clt_path
->s
.dev
->ib_dev
, iu
->dma_addr
,
2497 iu
->size
, DMA_FROM_DEVICE
);
2499 if (le16_to_cpu(msg
->type
) != RTRS_MSG_INFO_RSP
) {
2500 rtrs_err(clt_path
->clt
, "Path info response is malformed: type %d\n",
2501 le16_to_cpu(msg
->type
));
2504 rx_sz
= sizeof(*msg
);
2505 rx_sz
+= sizeof(msg
->desc
[0]) * le16_to_cpu(msg
->sg_cnt
);
2506 if (wc
->byte_len
< rx_sz
) {
2507 rtrs_err(clt_path
->clt
, "Path info response is malformed: size %d\n",
2511 err
= process_info_rsp(clt_path
, msg
);
2515 err
= post_recv_path(clt_path
);
2519 state
= RTRS_CLT_CONNECTED
;
2522 rtrs_clt_update_wc_stats(con
);
2523 rtrs_iu_free(iu
, clt_path
->s
.dev
->ib_dev
, 1);
2524 rtrs_clt_change_state_get_old(clt_path
, state
, NULL
);
2527 static int rtrs_send_path_info(struct rtrs_clt_path
*clt_path
)
2529 struct rtrs_clt_con
*usr_con
= to_clt_con(clt_path
->s
.con
[0]);
2530 struct rtrs_msg_info_req
*msg
;
2531 struct rtrs_iu
*tx_iu
, *rx_iu
;
2535 rx_sz
= sizeof(struct rtrs_msg_info_rsp
);
2536 rx_sz
+= sizeof(struct rtrs_sg_desc
) * clt_path
->queue_depth
;
2538 tx_iu
= rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req
), GFP_KERNEL
,
2539 clt_path
->s
.dev
->ib_dev
, DMA_TO_DEVICE
,
2540 rtrs_clt_info_req_done
);
2541 rx_iu
= rtrs_iu_alloc(1, rx_sz
, GFP_KERNEL
, clt_path
->s
.dev
->ib_dev
,
2542 DMA_FROM_DEVICE
, rtrs_clt_info_rsp_done
);
2543 if (!tx_iu
|| !rx_iu
) {
2547 /* Prepare for getting info response */
2548 err
= rtrs_iu_post_recv(&usr_con
->c
, rx_iu
);
2550 rtrs_err(clt_path
->clt
, "rtrs_iu_post_recv(), err: %d\n", err
);
2556 msg
->type
= cpu_to_le16(RTRS_MSG_INFO_REQ
);
2557 memcpy(msg
->pathname
, clt_path
->s
.sessname
, sizeof(msg
->pathname
));
2559 ib_dma_sync_single_for_device(clt_path
->s
.dev
->ib_dev
,
2561 tx_iu
->size
, DMA_TO_DEVICE
);
2563 /* Send info request */
2564 err
= rtrs_iu_post_send(&usr_con
->c
, tx_iu
, sizeof(*msg
), NULL
);
2566 rtrs_err(clt_path
->clt
, "rtrs_iu_post_send(), err: %d\n", err
);
2571 /* Wait for state change */
2572 wait_event_interruptible_timeout(clt_path
->state_wq
,
2573 clt_path
->state
!= RTRS_CLT_CONNECTING
,
2575 RTRS_CONNECT_TIMEOUT_MS
));
2576 if (READ_ONCE(clt_path
->state
) != RTRS_CLT_CONNECTED
) {
2577 if (READ_ONCE(clt_path
->state
) == RTRS_CLT_CONNECTING_ERR
)
2585 rtrs_iu_free(tx_iu
, clt_path
->s
.dev
->ib_dev
, 1);
2587 rtrs_iu_free(rx_iu
, clt_path
->s
.dev
->ib_dev
, 1);
2589 /* If we've never taken async path because of malloc problems */
2590 rtrs_clt_change_state_get_old(clt_path
,
2591 RTRS_CLT_CONNECTING_ERR
, NULL
);
2597 * init_path() - establishes all path connections and does handshake
2598 * @clt_path: client path.
2599 * In case of error full close or reconnect procedure should be taken,
2600 * because reconnect or close async works can be started.
2602 static int init_path(struct rtrs_clt_path
*clt_path
)
2606 struct rtrs_addr path
= {
2607 .src
= &clt_path
->s
.src_addr
,
2608 .dst
= &clt_path
->s
.dst_addr
,
2611 rtrs_addr_to_str(&path
, str
, sizeof(str
));
2613 mutex_lock(&clt_path
->init_mutex
);
2614 err
= init_conns(clt_path
);
2616 rtrs_err(clt_path
->clt
,
2617 "init_conns() failed: err=%d path=%s [%s:%u]\n", err
,
2618 str
, clt_path
->hca_name
, clt_path
->hca_port
);
2621 err
= rtrs_send_path_info(clt_path
);
2623 rtrs_err(clt_path
->clt
,
2624 "rtrs_send_path_info() failed: err=%d path=%s [%s:%u]\n",
2625 err
, str
, clt_path
->hca_name
, clt_path
->hca_port
);
2628 rtrs_clt_path_up(clt_path
);
2629 rtrs_start_hb(&clt_path
->s
);
2631 mutex_unlock(&clt_path
->init_mutex
);
2636 static void rtrs_clt_reconnect_work(struct work_struct
*work
)
2638 struct rtrs_clt_path
*clt_path
;
2639 struct rtrs_clt_sess
*clt
;
2642 clt_path
= container_of(to_delayed_work(work
), struct rtrs_clt_path
,
2644 clt
= clt_path
->clt
;
2646 trace_rtrs_clt_reconnect_work(clt_path
);
2648 if (READ_ONCE(clt_path
->state
) != RTRS_CLT_RECONNECTING
)
2651 if (clt_path
->reconnect_attempts
>= clt
->max_reconnect_attempts
) {
2652 /* Close a path completely if max attempts is reached */
2653 rtrs_clt_close_conns(clt_path
, false);
2656 clt_path
->reconnect_attempts
++;
2658 msleep(RTRS_RECONNECT_BACKOFF
);
2659 if (rtrs_clt_change_state_get_old(clt_path
, RTRS_CLT_CONNECTING
, NULL
)) {
2660 err
= init_path(clt_path
);
2662 goto reconnect_again
;
2668 if (rtrs_clt_change_state_get_old(clt_path
, RTRS_CLT_RECONNECTING
, NULL
)) {
2669 clt_path
->stats
->reconnects
.fail_cnt
++;
2670 queue_work(rtrs_wq
, &clt_path
->err_recovery_work
);
2674 static void rtrs_clt_dev_release(struct device
*dev
)
2676 struct rtrs_clt_sess
*clt
= container_of(dev
, struct rtrs_clt_sess
,
2679 mutex_destroy(&clt
->paths_ev_mutex
);
2680 mutex_destroy(&clt
->paths_mutex
);
2684 static struct rtrs_clt_sess
*alloc_clt(const char *sessname
, size_t paths_num
,
2685 u16 port
, size_t pdu_sz
, void *priv
,
2686 void (*link_ev
)(void *priv
,
2687 enum rtrs_clt_link_ev ev
),
2688 unsigned int reconnect_delay_sec
,
2689 unsigned int max_reconnect_attempts
)
2691 struct rtrs_clt_sess
*clt
;
2694 if (!paths_num
|| paths_num
> MAX_PATHS_NUM
)
2695 return ERR_PTR(-EINVAL
);
2697 if (strlen(sessname
) >= sizeof(clt
->sessname
))
2698 return ERR_PTR(-EINVAL
);
2700 clt
= kzalloc(sizeof(*clt
), GFP_KERNEL
);
2702 return ERR_PTR(-ENOMEM
);
2704 clt
->pcpu_path
= alloc_percpu(typeof(*clt
->pcpu_path
));
2705 if (!clt
->pcpu_path
) {
2707 return ERR_PTR(-ENOMEM
);
2710 clt
->dev
.class = &rtrs_clt_dev_class
;
2711 clt
->dev
.release
= rtrs_clt_dev_release
;
2712 uuid_gen(&clt
->paths_uuid
);
2713 INIT_LIST_HEAD_RCU(&clt
->paths_list
);
2714 clt
->paths_num
= paths_num
;
2715 clt
->paths_up
= MAX_PATHS_NUM
;
2717 clt
->pdu_sz
= pdu_sz
;
2718 clt
->max_segments
= RTRS_MAX_SEGMENTS
;
2719 clt
->reconnect_delay_sec
= reconnect_delay_sec
;
2720 clt
->max_reconnect_attempts
= max_reconnect_attempts
;
2722 clt
->link_ev
= link_ev
;
2723 clt
->mp_policy
= MP_POLICY_MIN_INFLIGHT
;
2724 strscpy(clt
->sessname
, sessname
, sizeof(clt
->sessname
));
2725 init_waitqueue_head(&clt
->permits_wait
);
2726 mutex_init(&clt
->paths_ev_mutex
);
2727 mutex_init(&clt
->paths_mutex
);
2728 device_initialize(&clt
->dev
);
2730 err
= dev_set_name(&clt
->dev
, "%s", sessname
);
2735 * Suppress user space notification until
2736 * sysfs files are created
2738 dev_set_uevent_suppress(&clt
->dev
, true);
2739 err
= device_add(&clt
->dev
);
2743 clt
->kobj_paths
= kobject_create_and_add("paths", &clt
->dev
.kobj
);
2744 if (!clt
->kobj_paths
) {
2748 err
= rtrs_clt_create_sysfs_root_files(clt
);
2750 kobject_del(clt
->kobj_paths
);
2751 kobject_put(clt
->kobj_paths
);
2754 dev_set_uevent_suppress(&clt
->dev
, false);
2755 kobject_uevent(&clt
->dev
.kobj
, KOBJ_ADD
);
2759 device_del(&clt
->dev
);
2761 free_percpu(clt
->pcpu_path
);
2762 put_device(&clt
->dev
);
2763 return ERR_PTR(err
);
2766 static void free_clt(struct rtrs_clt_sess
*clt
)
2768 free_percpu(clt
->pcpu_path
);
2771 * release callback will free clt and destroy mutexes in last put
2773 device_unregister(&clt
->dev
);
2777 * rtrs_clt_open() - Open a path to an RTRS server
2778 * @ops: holds the link event callback and the private pointer.
2779 * @pathname: name of the path to an RTRS server
2780 * @paths: Paths to be established defined by their src and dst addresses
2781 * @paths_num: Number of elements in the @paths array
2782 * @port: port to be used by the RTRS session
2783 * @pdu_sz: Size of extra payload which can be accessed after permit allocation.
2784 * @reconnect_delay_sec: time between reconnect tries
2785 * @max_reconnect_attempts: Number of times to reconnect on error before giving
2786 * up, 0 for * disabled, -1 for forever
2787 * @nr_poll_queues: number of polling mode connection using IB_POLL_DIRECT flag
2789 * Starts session establishment with the rtrs_server. The function can block
2790 * up to ~2000ms before it returns.
2792 * Return a valid pointer on success otherwise PTR_ERR.
2794 struct rtrs_clt_sess
*rtrs_clt_open(struct rtrs_clt_ops
*ops
,
2795 const char *pathname
,
2796 const struct rtrs_addr
*paths
,
2797 size_t paths_num
, u16 port
,
2798 size_t pdu_sz
, u8 reconnect_delay_sec
,
2799 s16 max_reconnect_attempts
, u32 nr_poll_queues
)
2801 struct rtrs_clt_path
*clt_path
, *tmp
;
2802 struct rtrs_clt_sess
*clt
;
2805 if (strchr(pathname
, '/') || strchr(pathname
, '.')) {
2806 pr_err("pathname cannot contain / and .\n");
2811 clt
= alloc_clt(pathname
, paths_num
, port
, pdu_sz
, ops
->priv
,
2813 reconnect_delay_sec
,
2814 max_reconnect_attempts
);
2819 for (i
= 0; i
< paths_num
; i
++) {
2820 struct rtrs_clt_path
*clt_path
;
2822 clt_path
= alloc_path(clt
, &paths
[i
], nr_cpu_ids
,
2824 if (IS_ERR(clt_path
)) {
2825 err
= PTR_ERR(clt_path
);
2826 goto close_all_path
;
2829 clt_path
->for_new_clt
= 1;
2830 list_add_tail_rcu(&clt_path
->s
.entry
, &clt
->paths_list
);
2832 err
= init_path(clt_path
);
2834 list_del_rcu(&clt_path
->s
.entry
);
2835 rtrs_clt_close_conns(clt_path
, true);
2836 free_percpu(clt_path
->stats
->pcpu_stats
);
2837 kfree(clt_path
->stats
);
2838 free_path(clt_path
);
2839 goto close_all_path
;
2842 err
= rtrs_clt_create_path_files(clt_path
);
2844 list_del_rcu(&clt_path
->s
.entry
);
2845 rtrs_clt_close_conns(clt_path
, true);
2846 free_percpu(clt_path
->stats
->pcpu_stats
);
2847 kfree(clt_path
->stats
);
2848 free_path(clt_path
);
2849 goto close_all_path
;
2852 err
= alloc_permits(clt
);
2854 goto close_all_path
;
2859 list_for_each_entry_safe(clt_path
, tmp
, &clt
->paths_list
, s
.entry
) {
2860 rtrs_clt_destroy_path_files(clt_path
, NULL
);
2861 rtrs_clt_close_conns(clt_path
, true);
2862 kobject_put(&clt_path
->kobj
);
2864 rtrs_clt_destroy_sysfs_root(clt
);
2868 return ERR_PTR(err
);
2870 EXPORT_SYMBOL(rtrs_clt_open
);
2873 * rtrs_clt_close() - Close a path
2874 * @clt: Session handle. Session is freed upon return.
2876 void rtrs_clt_close(struct rtrs_clt_sess
*clt
)
2878 struct rtrs_clt_path
*clt_path
, *tmp
;
2880 /* Firstly forbid sysfs access */
2881 rtrs_clt_destroy_sysfs_root(clt
);
2883 /* Now it is safe to iterate over all paths without locks */
2884 list_for_each_entry_safe(clt_path
, tmp
, &clt
->paths_list
, s
.entry
) {
2885 rtrs_clt_close_conns(clt_path
, true);
2886 rtrs_clt_destroy_path_files(clt_path
, NULL
);
2887 kobject_put(&clt_path
->kobj
);
2892 EXPORT_SYMBOL(rtrs_clt_close
);
2894 int rtrs_clt_reconnect_from_sysfs(struct rtrs_clt_path
*clt_path
)
2896 enum rtrs_clt_state old_state
;
2900 changed
= rtrs_clt_change_state_get_old(clt_path
,
2901 RTRS_CLT_RECONNECTING
,
2904 clt_path
->reconnect_attempts
= 0;
2905 rtrs_clt_stop_and_destroy_conns(clt_path
);
2906 queue_delayed_work(rtrs_wq
, &clt_path
->reconnect_dwork
, 0);
2908 if (changed
|| old_state
== RTRS_CLT_RECONNECTING
) {
2910 * flush_delayed_work() queues pending work for immediate
2911 * execution, so do the flush if we have queued something
2912 * right now or work is pending.
2914 flush_delayed_work(&clt_path
->reconnect_dwork
);
2915 err
= (READ_ONCE(clt_path
->state
) ==
2916 RTRS_CLT_CONNECTED
? 0 : -ENOTCONN
);
2922 int rtrs_clt_remove_path_from_sysfs(struct rtrs_clt_path
*clt_path
,
2923 const struct attribute
*sysfs_self
)
2925 enum rtrs_clt_state old_state
;
2929 * Continue stopping path till state was changed to DEAD or
2930 * state was observed as DEAD:
2931 * 1. State was changed to DEAD - we were fast and nobody
2932 * invoked rtrs_clt_reconnect(), which can again start
2934 * 2. State was observed as DEAD - we have someone in parallel
2935 * removing the path.
2938 rtrs_clt_close_conns(clt_path
, true);
2939 changed
= rtrs_clt_change_state_get_old(clt_path
,
2942 } while (!changed
&& old_state
!= RTRS_CLT_DEAD
);
2945 rtrs_clt_remove_path_from_arr(clt_path
);
2946 rtrs_clt_destroy_path_files(clt_path
, sysfs_self
);
2947 kobject_put(&clt_path
->kobj
);
2953 void rtrs_clt_set_max_reconnect_attempts(struct rtrs_clt_sess
*clt
, int value
)
2955 clt
->max_reconnect_attempts
= (unsigned int)value
;
2958 int rtrs_clt_get_max_reconnect_attempts(const struct rtrs_clt_sess
*clt
)
2960 return (int)clt
->max_reconnect_attempts
;
2964 * rtrs_clt_request() - Request data transfer to/from server via RDMA.
2967 * @ops: callback function to be called as confirmation, and the pointer.
2969 * @permit: Preallocated permit
2970 * @vec: Message that is sent to server together with the request.
2971 * Sum of len of all @vec elements limited to <= IO_MSG_SIZE.
2972 * Since the msg is copied internally it can be allocated on stack.
2973 * @nr: Number of elements in @vec.
2974 * @data_len: length of data sent to/from server
2975 * @sg: Pages to be sent/received to/from server.
2976 * @sg_cnt: Number of elements in the @sg
2982 * On dir=READ rtrs client will request a data transfer from Server to client.
2983 * The data that the server will respond with will be stored in @sg when
2984 * the user receives an %RTRS_CLT_RDMA_EV_RDMA_REQUEST_WRITE_COMPL event.
2985 * On dir=WRITE rtrs client will rdma write data in sg to server side.
2987 int rtrs_clt_request(int dir
, struct rtrs_clt_req_ops
*ops
,
2988 struct rtrs_clt_sess
*clt
, struct rtrs_permit
*permit
,
2989 const struct kvec
*vec
, size_t nr
, size_t data_len
,
2990 struct scatterlist
*sg
, unsigned int sg_cnt
)
2992 struct rtrs_clt_io_req
*req
;
2993 struct rtrs_clt_path
*clt_path
;
2995 enum dma_data_direction dma_dir
;
2996 int err
= -ECONNABORTED
, i
;
2997 size_t usr_len
, hdr_len
;
3000 /* Get kvec length */
3001 for (i
= 0, usr_len
= 0; i
< nr
; i
++)
3002 usr_len
+= vec
[i
].iov_len
;
3005 hdr_len
= sizeof(struct rtrs_msg_rdma_read
) +
3006 sg_cnt
* sizeof(struct rtrs_sg_desc
);
3007 dma_dir
= DMA_FROM_DEVICE
;
3009 hdr_len
= sizeof(struct rtrs_msg_rdma_write
);
3010 dma_dir
= DMA_TO_DEVICE
;
3014 for (path_it_init(&it
, clt
);
3015 (clt_path
= it
.next_path(&it
)) && it
.i
< it
.clt
->paths_num
; it
.i
++) {
3016 if (READ_ONCE(clt_path
->state
) != RTRS_CLT_CONNECTED
)
3019 if (usr_len
+ hdr_len
> clt_path
->max_hdr_size
) {
3020 rtrs_wrn_rl(clt_path
->clt
,
3021 "%s request failed, user message size is %zu and header length %zu, but max size is %u\n",
3022 dir
== READ
? "Read" : "Write",
3023 usr_len
, hdr_len
, clt_path
->max_hdr_size
);
3027 req
= rtrs_clt_get_req(clt_path
, ops
->conf_fn
, permit
, ops
->priv
,
3028 vec
, usr_len
, sg
, sg_cnt
, data_len
,
3031 err
= rtrs_clt_read_req(req
);
3033 err
= rtrs_clt_write_req(req
);
3035 req
->in_use
= false;
3041 path_it_deinit(&it
);
3046 EXPORT_SYMBOL(rtrs_clt_request
);
3048 int rtrs_clt_rdma_cq_direct(struct rtrs_clt_sess
*clt
, unsigned int index
)
3050 /* If no path, return -1 for block layer not to try again */
3052 struct rtrs_con
*con
;
3053 struct rtrs_clt_path
*clt_path
;
3057 for (path_it_init(&it
, clt
);
3058 (clt_path
= it
.next_path(&it
)) && it
.i
< it
.clt
->paths_num
; it
.i
++) {
3059 if (READ_ONCE(clt_path
->state
) != RTRS_CLT_CONNECTED
)
3062 con
= clt_path
->s
.con
[index
+ 1];
3063 cnt
= ib_process_cq_direct(con
->cq
, -1);
3067 path_it_deinit(&it
);
3072 EXPORT_SYMBOL(rtrs_clt_rdma_cq_direct
);
3075 * rtrs_clt_query() - queries RTRS session attributes
3076 *@clt: session pointer
3077 *@attr: query results for session attributes.
3080 * -ECOMM no connection to the server
3082 int rtrs_clt_query(struct rtrs_clt_sess
*clt
, struct rtrs_attrs
*attr
)
3084 if (!rtrs_clt_is_connected(clt
))
3087 attr
->queue_depth
= clt
->queue_depth
;
3088 attr
->max_segments
= clt
->max_segments
;
3089 /* Cap max_io_size to min of remote buffer size and the fr pages */
3090 attr
->max_io_size
= min_t(int, clt
->max_io_size
,
3091 clt
->max_segments
* SZ_4K
);
3095 EXPORT_SYMBOL(rtrs_clt_query
);
3097 int rtrs_clt_create_path_from_sysfs(struct rtrs_clt_sess
*clt
,
3098 struct rtrs_addr
*addr
)
3100 struct rtrs_clt_path
*clt_path
;
3103 clt_path
= alloc_path(clt
, addr
, nr_cpu_ids
, 0);
3104 if (IS_ERR(clt_path
))
3105 return PTR_ERR(clt_path
);
3107 mutex_lock(&clt
->paths_mutex
);
3108 if (clt
->paths_num
== 0) {
3110 * When all the paths are removed for a session,
3111 * the addition of the first path is like a new session for
3112 * the storage server
3114 clt_path
->for_new_clt
= 1;
3117 mutex_unlock(&clt
->paths_mutex
);
3120 * It is totally safe to add path in CONNECTING state: coming
3121 * IO will never grab it. Also it is very important to add
3122 * path before init, since init fires LINK_CONNECTED event.
3124 rtrs_clt_add_path_to_arr(clt_path
);
3126 err
= init_path(clt_path
);
3130 err
= rtrs_clt_create_path_files(clt_path
);
3137 rtrs_clt_remove_path_from_arr(clt_path
);
3138 rtrs_clt_close_conns(clt_path
, true);
3139 free_percpu(clt_path
->stats
->pcpu_stats
);
3140 kfree(clt_path
->stats
);
3141 free_path(clt_path
);
3146 void rtrs_clt_ib_event_handler(struct ib_event_handler
*handler
,
3147 struct ib_event
*ibevent
)
3149 pr_info("Handling event: %s (%d).\n", ib_event_msg(ibevent
->event
),
3154 static int rtrs_clt_ib_dev_init(struct rtrs_ib_dev
*dev
)
3156 INIT_IB_EVENT_HANDLER(&dev
->event_handler
, dev
->ib_dev
,
3157 rtrs_clt_ib_event_handler
);
3158 ib_register_event_handler(&dev
->event_handler
);
3160 if (!(dev
->ib_dev
->attrs
.device_cap_flags
&
3161 IB_DEVICE_MEM_MGT_EXTENSIONS
)) {
3162 pr_err("Memory registrations not supported.\n");
3169 static void rtrs_clt_ib_dev_deinit(struct rtrs_ib_dev
*dev
)
3171 ib_unregister_event_handler(&dev
->event_handler
);
3175 static const struct rtrs_rdma_dev_pd_ops dev_pd_ops
= {
3176 .init
= rtrs_clt_ib_dev_init
,
3177 .deinit
= rtrs_clt_ib_dev_deinit
3180 static int __init
rtrs_client_init(void)
3184 rtrs_rdma_dev_pd_init(0, &dev_pd
);
3185 ret
= class_register(&rtrs_clt_dev_class
);
3187 pr_err("Failed to create rtrs-client dev class\n");
3190 rtrs_wq
= alloc_workqueue("rtrs_client_wq", 0, 0);
3192 class_unregister(&rtrs_clt_dev_class
);
3199 static void __exit
rtrs_client_exit(void)
3201 destroy_workqueue(rtrs_wq
);
3202 class_unregister(&rtrs_clt_dev_class
);
3203 rtrs_rdma_dev_pd_deinit(&dev_pd
);
3206 module_init(rtrs_client_init
);
3207 module_exit(rtrs_client_exit
);