1 // SPDX-License-Identifier: GPL-2.0-or-later
5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
13 #include <linux/module.h>
14 #include <linux/rculist.h>
15 #include <linux/random.h>
20 #define RTRS_CONNECT_TIMEOUT_MS 30000
22 * Wait a bit before trying to reconnect after a failure
23 * in order to give server time to finish clean up which
24 * leads to "false positives" failed reconnect attempts
26 #define RTRS_RECONNECT_BACKOFF 1000
28 * Wait for additional random time between 0 and 8 seconds
29 * before starting to reconnect to avoid clients reconnecting
30 * all at once in case of a major network outage
32 #define RTRS_RECONNECT_SEED 8
34 MODULE_DESCRIPTION("RDMA Transport Client");
35 MODULE_LICENSE("GPL");
37 static const struct rtrs_rdma_dev_pd_ops dev_pd_ops
;
38 static struct rtrs_rdma_dev_pd dev_pd
= {
42 static struct workqueue_struct
*rtrs_wq
;
43 static struct class *rtrs_clt_dev_class
;
45 static inline bool rtrs_clt_is_connected(const struct rtrs_clt
*clt
)
47 struct rtrs_clt_sess
*sess
;
48 bool connected
= false;
51 list_for_each_entry_rcu(sess
, &clt
->paths_list
, s
.entry
)
52 connected
|= READ_ONCE(sess
->state
) == RTRS_CLT_CONNECTED
;
58 static struct rtrs_permit
*
59 __rtrs_get_permit(struct rtrs_clt
*clt
, enum rtrs_clt_con_type con_type
)
61 size_t max_depth
= clt
->queue_depth
;
62 struct rtrs_permit
*permit
;
66 * Adapted from null_blk get_tag(). Callers from different cpus may
67 * grab the same bit, since find_first_zero_bit is not atomic.
68 * But then the test_and_set_bit_lock will fail for all the
69 * callers but one, so that they will loop again.
70 * This way an explicit spinlock is not required.
73 bit
= find_first_zero_bit(clt
->permits_map
, max_depth
);
74 if (unlikely(bit
>= max_depth
))
76 } while (unlikely(test_and_set_bit_lock(bit
, clt
->permits_map
)));
78 permit
= get_permit(clt
, bit
);
79 WARN_ON(permit
->mem_id
!= bit
);
80 permit
->cpu_id
= raw_smp_processor_id();
81 permit
->con_type
= con_type
;
86 static inline void __rtrs_put_permit(struct rtrs_clt
*clt
,
87 struct rtrs_permit
*permit
)
89 clear_bit_unlock(permit
->mem_id
, clt
->permits_map
);
93 * rtrs_clt_get_permit() - allocates permit for future RDMA operation
94 * @clt: Current session
95 * @con_type: Type of connection to use with the permit
96 * @can_wait: Wait type
99 * Allocates permit for the following RDMA operation. Permit is used
100 * to preallocate all resources and to propagate memory pressure
104 * Can sleep if @wait == RTRS_TAG_WAIT
106 struct rtrs_permit
*rtrs_clt_get_permit(struct rtrs_clt
*clt
,
107 enum rtrs_clt_con_type con_type
,
110 struct rtrs_permit
*permit
;
113 permit
= __rtrs_get_permit(clt
, con_type
);
114 if (likely(permit
) || !can_wait
)
118 prepare_to_wait(&clt
->permits_wait
, &wait
,
119 TASK_UNINTERRUPTIBLE
);
120 permit
= __rtrs_get_permit(clt
, con_type
);
127 finish_wait(&clt
->permits_wait
, &wait
);
131 EXPORT_SYMBOL(rtrs_clt_get_permit
);
134 * rtrs_clt_put_permit() - puts allocated permit
135 * @clt: Current session
136 * @permit: Permit to be freed
141 void rtrs_clt_put_permit(struct rtrs_clt
*clt
, struct rtrs_permit
*permit
)
143 if (WARN_ON(!test_bit(permit
->mem_id
, clt
->permits_map
)))
146 __rtrs_put_permit(clt
, permit
);
149 * rtrs_clt_get_permit() adds itself to the &clt->permits_wait list
150 * before calling schedule(). So if rtrs_clt_get_permit() is sleeping
151 * it must have added itself to &clt->permits_wait before
152 * __rtrs_put_permit() finished.
153 * Hence it is safe to guard wake_up() with a waitqueue_active() test.
155 if (waitqueue_active(&clt
->permits_wait
))
156 wake_up(&clt
->permits_wait
);
158 EXPORT_SYMBOL(rtrs_clt_put_permit
);
161 * rtrs_permit_to_clt_con() - returns RDMA connection pointer by the permit
162 * @sess: client session pointer
163 * @permit: permit for the allocation of the RDMA buffer
165 * IO connection starts from 1.
166 * 0 connection is for user messages.
169 struct rtrs_clt_con
*rtrs_permit_to_clt_con(struct rtrs_clt_sess
*sess
,
170 struct rtrs_permit
*permit
)
174 if (likely(permit
->con_type
== RTRS_IO_CON
))
175 id
= (permit
->cpu_id
% (sess
->s
.con_num
- 1)) + 1;
177 return to_clt_con(sess
->s
.con
[id
]);
181 * __rtrs_clt_change_state() - change the session state through session state
184 * @sess: client session to change the state of.
185 * @new_state: state to change to.
187 * returns true if successful, false if the requested state can not be set.
190 * state_wq lock must be hold.
192 static bool __rtrs_clt_change_state(struct rtrs_clt_sess
*sess
,
193 enum rtrs_clt_state new_state
)
195 enum rtrs_clt_state old_state
;
196 bool changed
= false;
198 lockdep_assert_held(&sess
->state_wq
.lock
);
200 old_state
= sess
->state
;
202 case RTRS_CLT_CONNECTING
:
204 case RTRS_CLT_RECONNECTING
:
211 case RTRS_CLT_RECONNECTING
:
213 case RTRS_CLT_CONNECTED
:
214 case RTRS_CLT_CONNECTING_ERR
:
215 case RTRS_CLT_CLOSED
:
222 case RTRS_CLT_CONNECTED
:
224 case RTRS_CLT_CONNECTING
:
231 case RTRS_CLT_CONNECTING_ERR
:
233 case RTRS_CLT_CONNECTING
:
240 case RTRS_CLT_CLOSING
:
242 case RTRS_CLT_CONNECTING
:
243 case RTRS_CLT_CONNECTING_ERR
:
244 case RTRS_CLT_RECONNECTING
:
245 case RTRS_CLT_CONNECTED
:
252 case RTRS_CLT_CLOSED
:
254 case RTRS_CLT_CLOSING
:
263 case RTRS_CLT_CLOSED
:
274 sess
->state
= new_state
;
275 wake_up_locked(&sess
->state_wq
);
281 static bool rtrs_clt_change_state_from_to(struct rtrs_clt_sess
*sess
,
282 enum rtrs_clt_state old_state
,
283 enum rtrs_clt_state new_state
)
285 bool changed
= false;
287 spin_lock_irq(&sess
->state_wq
.lock
);
288 if (sess
->state
== old_state
)
289 changed
= __rtrs_clt_change_state(sess
, new_state
);
290 spin_unlock_irq(&sess
->state_wq
.lock
);
295 static void rtrs_rdma_error_recovery(struct rtrs_clt_con
*con
)
297 struct rtrs_clt_sess
*sess
= to_clt_sess(con
->c
.sess
);
299 if (rtrs_clt_change_state_from_to(sess
,
301 RTRS_CLT_RECONNECTING
)) {
302 struct rtrs_clt
*clt
= sess
->clt
;
303 unsigned int delay_ms
;
306 * Normal scenario, reconnect if we were successfully connected
308 delay_ms
= clt
->reconnect_delay_sec
* 1000;
309 queue_delayed_work(rtrs_wq
, &sess
->reconnect_dwork
,
310 msecs_to_jiffies(delay_ms
+
311 prandom_u32() % RTRS_RECONNECT_SEED
));
314 * Error can happen just on establishing new connection,
315 * so notify waiter with error state, waiter is responsible
316 * for cleaning the rest and reconnect if needed.
318 rtrs_clt_change_state_from_to(sess
,
320 RTRS_CLT_CONNECTING_ERR
);
324 static void rtrs_clt_fast_reg_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
326 struct rtrs_clt_con
*con
= cq
->cq_context
;
328 if (unlikely(wc
->status
!= IB_WC_SUCCESS
)) {
329 rtrs_err(con
->c
.sess
, "Failed IB_WR_REG_MR: %s\n",
330 ib_wc_status_msg(wc
->status
));
331 rtrs_rdma_error_recovery(con
);
335 static struct ib_cqe fast_reg_cqe
= {
336 .done
= rtrs_clt_fast_reg_done
339 static void complete_rdma_req(struct rtrs_clt_io_req
*req
, int errno
,
340 bool notify
, bool can_wait
);
342 static void rtrs_clt_inv_rkey_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
344 struct rtrs_clt_io_req
*req
=
345 container_of(wc
->wr_cqe
, typeof(*req
), inv_cqe
);
346 struct rtrs_clt_con
*con
= cq
->cq_context
;
348 if (unlikely(wc
->status
!= IB_WC_SUCCESS
)) {
349 rtrs_err(con
->c
.sess
, "Failed IB_WR_LOCAL_INV: %s\n",
350 ib_wc_status_msg(wc
->status
));
351 rtrs_rdma_error_recovery(con
);
353 req
->need_inv
= false;
354 if (likely(req
->need_inv_comp
))
355 complete(&req
->inv_comp
);
357 /* Complete request from INV callback */
358 complete_rdma_req(req
, req
->inv_errno
, true, false);
361 static int rtrs_inv_rkey(struct rtrs_clt_io_req
*req
)
363 struct rtrs_clt_con
*con
= req
->con
;
364 struct ib_send_wr wr
= {
365 .opcode
= IB_WR_LOCAL_INV
,
366 .wr_cqe
= &req
->inv_cqe
,
367 .send_flags
= IB_SEND_SIGNALED
,
368 .ex
.invalidate_rkey
= req
->mr
->rkey
,
370 req
->inv_cqe
.done
= rtrs_clt_inv_rkey_done
;
372 return ib_post_send(con
->c
.qp
, &wr
, NULL
);
375 static void complete_rdma_req(struct rtrs_clt_io_req
*req
, int errno
,
376 bool notify
, bool can_wait
)
378 struct rtrs_clt_con
*con
= req
->con
;
379 struct rtrs_clt_sess
*sess
;
382 if (WARN_ON(!req
->in_use
))
384 if (WARN_ON(!req
->con
))
386 sess
= to_clt_sess(con
->c
.sess
);
389 if (unlikely(req
->dir
== DMA_FROM_DEVICE
&& req
->need_inv
)) {
391 * We are here to invalidate read requests
392 * ourselves. In normal scenario server should
393 * send INV for all read requests, but
394 * we are here, thus two things could happen:
396 * 1. this is failover, when errno != 0
399 * 2. something totally bad happened and
400 * server forgot to send INV, so we
401 * should do that ourselves.
404 if (likely(can_wait
)) {
405 req
->need_inv_comp
= true;
407 /* This should be IO path, so always notify */
409 /* Save errno for INV callback */
410 req
->inv_errno
= errno
;
413 err
= rtrs_inv_rkey(req
);
415 rtrs_err(con
->c
.sess
, "Send INV WR key=%#x: %d\n",
417 } else if (likely(can_wait
)) {
418 wait_for_completion(&req
->inv_comp
);
421 * Something went wrong, so request will be
422 * completed from INV callback.
429 ib_dma_unmap_sg(sess
->s
.dev
->ib_dev
, req
->sglist
,
430 req
->sg_cnt
, req
->dir
);
432 if (sess
->clt
->mp_policy
== MP_POLICY_MIN_INFLIGHT
)
433 atomic_dec(&sess
->stats
->inflight
);
439 req
->conf(req
->priv
, errno
);
442 static int rtrs_post_send_rdma(struct rtrs_clt_con
*con
,
443 struct rtrs_clt_io_req
*req
,
444 struct rtrs_rbuf
*rbuf
, u32 off
,
445 u32 imm
, struct ib_send_wr
*wr
)
447 struct rtrs_clt_sess
*sess
= to_clt_sess(con
->c
.sess
);
448 enum ib_send_flags flags
;
451 if (unlikely(!req
->sg_size
)) {
452 rtrs_wrn(con
->c
.sess
,
453 "Doing RDMA Write failed, no data supplied\n");
457 /* user data and user message in the first list element */
458 sge
.addr
= req
->iu
->dma_addr
;
459 sge
.length
= req
->sg_size
;
460 sge
.lkey
= sess
->s
.dev
->ib_pd
->local_dma_lkey
;
463 * From time to time we have to post signalled sends,
464 * or send queue will fill up and only QP reset can help.
466 flags
= atomic_inc_return(&con
->io_cnt
) % sess
->queue_depth
?
467 0 : IB_SEND_SIGNALED
;
469 ib_dma_sync_single_for_device(sess
->s
.dev
->ib_dev
, req
->iu
->dma_addr
,
470 req
->sg_size
, DMA_TO_DEVICE
);
472 return rtrs_iu_post_rdma_write_imm(&con
->c
, req
->iu
, &sge
, 1,
473 rbuf
->rkey
, rbuf
->addr
+ off
,
477 static void process_io_rsp(struct rtrs_clt_sess
*sess
, u32 msg_id
,
478 s16 errno
, bool w_inval
)
480 struct rtrs_clt_io_req
*req
;
482 if (WARN_ON(msg_id
>= sess
->queue_depth
))
485 req
= &sess
->reqs
[msg_id
];
486 /* Drop need_inv if server responded with send with invalidation */
487 req
->need_inv
&= !w_inval
;
488 complete_rdma_req(req
, errno
, true, false);
491 static void rtrs_clt_recv_done(struct rtrs_clt_con
*con
, struct ib_wc
*wc
)
495 struct rtrs_clt_sess
*sess
= to_clt_sess(con
->c
.sess
);
497 WARN_ON(sess
->flags
!= RTRS_MSG_NEW_RKEY_F
);
498 iu
= container_of(wc
->wr_cqe
, struct rtrs_iu
,
500 err
= rtrs_iu_post_recv(&con
->c
, iu
);
502 rtrs_err(con
->c
.sess
, "post iu failed %d\n", err
);
503 rtrs_rdma_error_recovery(con
);
507 static void rtrs_clt_rkey_rsp_done(struct rtrs_clt_con
*con
, struct ib_wc
*wc
)
509 struct rtrs_clt_sess
*sess
= to_clt_sess(con
->c
.sess
);
510 struct rtrs_msg_rkey_rsp
*msg
;
511 u32 imm_type
, imm_payload
;
512 bool w_inval
= false;
517 WARN_ON(sess
->flags
!= RTRS_MSG_NEW_RKEY_F
);
519 iu
= container_of(wc
->wr_cqe
, struct rtrs_iu
, cqe
);
521 if (unlikely(wc
->byte_len
< sizeof(*msg
))) {
522 rtrs_err(con
->c
.sess
, "rkey response is malformed: size %d\n",
526 ib_dma_sync_single_for_cpu(sess
->s
.dev
->ib_dev
, iu
->dma_addr
,
527 iu
->size
, DMA_FROM_DEVICE
);
529 if (unlikely(le16_to_cpu(msg
->type
) != RTRS_MSG_RKEY_RSP
)) {
530 rtrs_err(sess
->clt
, "rkey response is malformed: type %d\n",
531 le16_to_cpu(msg
->type
));
534 buf_id
= le16_to_cpu(msg
->buf_id
);
535 if (WARN_ON(buf_id
>= sess
->queue_depth
))
538 rtrs_from_imm(be32_to_cpu(wc
->ex
.imm_data
), &imm_type
, &imm_payload
);
539 if (likely(imm_type
== RTRS_IO_RSP_IMM
||
540 imm_type
== RTRS_IO_RSP_W_INV_IMM
)) {
543 w_inval
= (imm_type
== RTRS_IO_RSP_W_INV_IMM
);
544 rtrs_from_io_rsp_imm(imm_payload
, &msg_id
, &err
);
546 if (WARN_ON(buf_id
!= msg_id
))
548 sess
->rbufs
[buf_id
].rkey
= le32_to_cpu(msg
->rkey
);
549 process_io_rsp(sess
, msg_id
, err
, w_inval
);
551 ib_dma_sync_single_for_device(sess
->s
.dev
->ib_dev
, iu
->dma_addr
,
552 iu
->size
, DMA_FROM_DEVICE
);
553 return rtrs_clt_recv_done(con
, wc
);
555 rtrs_rdma_error_recovery(con
);
558 static void rtrs_clt_rdma_done(struct ib_cq
*cq
, struct ib_wc
*wc
);
560 static struct ib_cqe io_comp_cqe
= {
561 .done
= rtrs_clt_rdma_done
565 * Post x2 empty WRs: first is for this RDMA with IMM,
566 * second is for RECV with INV, which happened earlier.
568 static int rtrs_post_recv_empty_x2(struct rtrs_con
*con
, struct ib_cqe
*cqe
)
570 struct ib_recv_wr wr_arr
[2], *wr
;
573 memset(wr_arr
, 0, sizeof(wr_arr
));
574 for (i
= 0; i
< ARRAY_SIZE(wr_arr
); i
++) {
578 /* Chain backwards */
579 wr
->next
= &wr_arr
[i
- 1];
582 return ib_post_recv(con
->qp
, wr
, NULL
);
585 static void rtrs_clt_rdma_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
587 struct rtrs_clt_con
*con
= cq
->cq_context
;
588 struct rtrs_clt_sess
*sess
= to_clt_sess(con
->c
.sess
);
589 u32 imm_type
, imm_payload
;
590 bool w_inval
= false;
593 if (unlikely(wc
->status
!= IB_WC_SUCCESS
)) {
594 if (wc
->status
!= IB_WC_WR_FLUSH_ERR
) {
595 rtrs_err(sess
->clt
, "RDMA failed: %s\n",
596 ib_wc_status_msg(wc
->status
));
597 rtrs_rdma_error_recovery(con
);
601 rtrs_clt_update_wc_stats(con
);
603 switch (wc
->opcode
) {
604 case IB_WC_RECV_RDMA_WITH_IMM
:
606 * post_recv() RDMA write completions of IO reqs (read/write)
609 if (WARN_ON(wc
->wr_cqe
->done
!= rtrs_clt_rdma_done
))
611 rtrs_from_imm(be32_to_cpu(wc
->ex
.imm_data
),
612 &imm_type
, &imm_payload
);
613 if (likely(imm_type
== RTRS_IO_RSP_IMM
||
614 imm_type
== RTRS_IO_RSP_W_INV_IMM
)) {
617 w_inval
= (imm_type
== RTRS_IO_RSP_W_INV_IMM
);
618 rtrs_from_io_rsp_imm(imm_payload
, &msg_id
, &err
);
620 process_io_rsp(sess
, msg_id
, err
, w_inval
);
621 } else if (imm_type
== RTRS_HB_MSG_IMM
) {
623 rtrs_send_hb_ack(&sess
->s
);
624 if (sess
->flags
== RTRS_MSG_NEW_RKEY_F
)
625 return rtrs_clt_recv_done(con
, wc
);
626 } else if (imm_type
== RTRS_HB_ACK_IMM
) {
628 sess
->s
.hb_missed_cnt
= 0;
629 if (sess
->flags
== RTRS_MSG_NEW_RKEY_F
)
630 return rtrs_clt_recv_done(con
, wc
);
632 rtrs_wrn(con
->c
.sess
, "Unknown IMM type %u\n",
637 * Post x2 empty WRs: first is for this RDMA with IMM,
638 * second is for RECV with INV, which happened earlier.
640 err
= rtrs_post_recv_empty_x2(&con
->c
, &io_comp_cqe
);
642 err
= rtrs_post_recv_empty(&con
->c
, &io_comp_cqe
);
644 rtrs_err(con
->c
.sess
, "rtrs_post_recv_empty(): %d\n",
646 rtrs_rdma_error_recovery(con
);
652 * Key invalidations from server side
654 WARN_ON(!(wc
->wc_flags
& IB_WC_WITH_INVALIDATE
||
655 wc
->wc_flags
& IB_WC_WITH_IMM
));
656 WARN_ON(wc
->wr_cqe
->done
!= rtrs_clt_rdma_done
);
657 if (sess
->flags
== RTRS_MSG_NEW_RKEY_F
) {
658 if (wc
->wc_flags
& IB_WC_WITH_INVALIDATE
)
659 return rtrs_clt_recv_done(con
, wc
);
661 return rtrs_clt_rkey_rsp_done(con
, wc
);
664 case IB_WC_RDMA_WRITE
:
666 * post_send() RDMA write completions of IO reqs (read/write)
672 rtrs_wrn(sess
->clt
, "Unexpected WC type: %d\n", wc
->opcode
);
677 static int post_recv_io(struct rtrs_clt_con
*con
, size_t q_size
)
680 struct rtrs_clt_sess
*sess
= to_clt_sess(con
->c
.sess
);
682 for (i
= 0; i
< q_size
; i
++) {
683 if (sess
->flags
== RTRS_MSG_NEW_RKEY_F
) {
684 struct rtrs_iu
*iu
= &con
->rsp_ius
[i
];
686 err
= rtrs_iu_post_recv(&con
->c
, iu
);
688 err
= rtrs_post_recv_empty(&con
->c
, &io_comp_cqe
);
697 static int post_recv_sess(struct rtrs_clt_sess
*sess
)
702 for (cid
= 0; cid
< sess
->s
.con_num
; cid
++) {
704 q_size
= SERVICE_CON_QUEUE_DEPTH
;
706 q_size
= sess
->queue_depth
;
709 * x2 for RDMA read responses + FR key invalidations,
710 * RDMA writes do not require any FR registrations.
714 err
= post_recv_io(to_clt_con(sess
->s
.con
[cid
]), q_size
);
716 rtrs_err(sess
->clt
, "post_recv_io(), err: %d\n", err
);
726 struct list_head skip_list
;
727 struct rtrs_clt
*clt
;
728 struct rtrs_clt_sess
*(*next_path
)(struct path_it
*it
);
732 * list_next_or_null_rr_rcu - get next list element in round-robin fashion.
733 * @head: the head for the list.
734 * @ptr: the list head to take the next element from.
735 * @type: the type of the struct this is embedded in.
736 * @memb: the name of the list_head within the struct.
738 * Next element returned in round-robin fashion, i.e. head will be skipped,
739 * but if list is observed as empty, NULL will be returned.
741 * This primitive may safely run concurrently with the _rcu list-mutation
742 * primitives such as list_add_rcu() as long as it's guarded by rcu_read_lock().
744 #define list_next_or_null_rr_rcu(head, ptr, type, memb) \
746 list_next_or_null_rcu(head, ptr, type, memb) ?: \
747 list_next_or_null_rcu(head, READ_ONCE((ptr)->next), \
752 * get_next_path_rr() - Returns path in round-robin fashion.
753 * @it: the path pointer
755 * Related to @MP_POLICY_RR
758 * rcu_read_lock() must be hold.
760 static struct rtrs_clt_sess
*get_next_path_rr(struct path_it
*it
)
762 struct rtrs_clt_sess __rcu
**ppcpu_path
;
763 struct rtrs_clt_sess
*path
;
764 struct rtrs_clt
*clt
;
769 * Here we use two RCU objects: @paths_list and @pcpu_path
770 * pointer. See rtrs_clt_remove_path_from_arr() for details
771 * how that is handled.
774 ppcpu_path
= this_cpu_ptr(clt
->pcpu_path
);
775 path
= rcu_dereference(*ppcpu_path
);
777 path
= list_first_or_null_rcu(&clt
->paths_list
,
778 typeof(*path
), s
.entry
);
780 path
= list_next_or_null_rr_rcu(&clt
->paths_list
,
784 rcu_assign_pointer(*ppcpu_path
, path
);
790 * get_next_path_min_inflight() - Returns path with minimal inflight count.
791 * @it: the path pointer
793 * Related to @MP_POLICY_MIN_INFLIGHT
796 * rcu_read_lock() must be hold.
798 static struct rtrs_clt_sess
*get_next_path_min_inflight(struct path_it
*it
)
800 struct rtrs_clt_sess
*min_path
= NULL
;
801 struct rtrs_clt
*clt
= it
->clt
;
802 struct rtrs_clt_sess
*sess
;
803 int min_inflight
= INT_MAX
;
806 list_for_each_entry_rcu(sess
, &clt
->paths_list
, s
.entry
) {
807 if (unlikely(!list_empty(raw_cpu_ptr(sess
->mp_skip_entry
))))
810 inflight
= atomic_read(&sess
->stats
->inflight
);
812 if (inflight
< min_inflight
) {
813 min_inflight
= inflight
;
819 * add the path to the skip list, so that next time we can get
823 list_add(raw_cpu_ptr(min_path
->mp_skip_entry
), &it
->skip_list
);
828 static inline void path_it_init(struct path_it
*it
, struct rtrs_clt
*clt
)
830 INIT_LIST_HEAD(&it
->skip_list
);
834 if (clt
->mp_policy
== MP_POLICY_RR
)
835 it
->next_path
= get_next_path_rr
;
837 it
->next_path
= get_next_path_min_inflight
;
840 static inline void path_it_deinit(struct path_it
*it
)
842 struct list_head
*skip
, *tmp
;
844 * The skip_list is used only for the MIN_INFLIGHT policy.
845 * We need to remove paths from it, so that next IO can insert
846 * paths (->mp_skip_entry) into a skip_list again.
848 list_for_each_safe(skip
, tmp
, &it
->skip_list
)
853 * rtrs_clt_init_req() Initialize an rtrs_clt_io_req holding information
854 * about an inflight IO.
855 * The user buffer holding user control message (not data) is copied into
856 * the corresponding buffer of rtrs_iu (req->iu->buf), which later on will
857 * also hold the control message of rtrs.
858 * @req: an io request holding information about IO.
859 * @sess: client session
860 * @conf: conformation callback function to notify upper layer.
861 * @permit: permit for allocation of RDMA remote buffer
862 * @priv: private pointer
863 * @vec: kernel vector containing control message
864 * @usr_len: length of the user message
865 * @sg: scater list for IO data
866 * @sg_cnt: number of scater list entries
867 * @data_len: length of the IO data
868 * @dir: direction of the IO.
870 static void rtrs_clt_init_req(struct rtrs_clt_io_req
*req
,
871 struct rtrs_clt_sess
*sess
,
872 void (*conf
)(void *priv
, int errno
),
873 struct rtrs_permit
*permit
, void *priv
,
874 const struct kvec
*vec
, size_t usr_len
,
875 struct scatterlist
*sg
, size_t sg_cnt
,
876 size_t data_len
, int dir
)
878 struct iov_iter iter
;
881 req
->permit
= permit
;
883 req
->usr_len
= usr_len
;
884 req
->data_len
= data_len
;
886 req
->sg_cnt
= sg_cnt
;
889 req
->con
= rtrs_permit_to_clt_con(sess
, permit
);
891 req
->need_inv
= false;
892 req
->need_inv_comp
= false;
895 iov_iter_kvec(&iter
, READ
, vec
, 1, usr_len
);
896 len
= _copy_from_iter(req
->iu
->buf
, usr_len
, &iter
);
897 WARN_ON(len
!= usr_len
);
899 reinit_completion(&req
->inv_comp
);
902 static struct rtrs_clt_io_req
*
903 rtrs_clt_get_req(struct rtrs_clt_sess
*sess
,
904 void (*conf
)(void *priv
, int errno
),
905 struct rtrs_permit
*permit
, void *priv
,
906 const struct kvec
*vec
, size_t usr_len
,
907 struct scatterlist
*sg
, size_t sg_cnt
,
908 size_t data_len
, int dir
)
910 struct rtrs_clt_io_req
*req
;
912 req
= &sess
->reqs
[permit
->mem_id
];
913 rtrs_clt_init_req(req
, sess
, conf
, permit
, priv
, vec
, usr_len
,
914 sg
, sg_cnt
, data_len
, dir
);
918 static struct rtrs_clt_io_req
*
919 rtrs_clt_get_copy_req(struct rtrs_clt_sess
*alive_sess
,
920 struct rtrs_clt_io_req
*fail_req
)
922 struct rtrs_clt_io_req
*req
;
924 .iov_base
= fail_req
->iu
->buf
,
925 .iov_len
= fail_req
->usr_len
928 req
= &alive_sess
->reqs
[fail_req
->permit
->mem_id
];
929 rtrs_clt_init_req(req
, alive_sess
, fail_req
->conf
, fail_req
->permit
,
930 fail_req
->priv
, &vec
, fail_req
->usr_len
,
931 fail_req
->sglist
, fail_req
->sg_cnt
,
932 fail_req
->data_len
, fail_req
->dir
);
936 static int rtrs_post_rdma_write_sg(struct rtrs_clt_con
*con
,
937 struct rtrs_clt_io_req
*req
,
938 struct rtrs_rbuf
*rbuf
,
941 struct rtrs_clt_sess
*sess
= to_clt_sess(con
->c
.sess
);
942 struct ib_sge
*sge
= req
->sge
;
943 enum ib_send_flags flags
;
944 struct scatterlist
*sg
;
948 for_each_sg(req
->sglist
, sg
, req
->sg_cnt
, i
) {
949 sge
[i
].addr
= sg_dma_address(sg
);
950 sge
[i
].length
= sg_dma_len(sg
);
951 sge
[i
].lkey
= sess
->s
.dev
->ib_pd
->local_dma_lkey
;
953 sge
[i
].addr
= req
->iu
->dma_addr
;
954 sge
[i
].length
= size
;
955 sge
[i
].lkey
= sess
->s
.dev
->ib_pd
->local_dma_lkey
;
957 num_sge
= 1 + req
->sg_cnt
;
960 * From time to time we have to post signalled sends,
961 * or send queue will fill up and only QP reset can help.
963 flags
= atomic_inc_return(&con
->io_cnt
) % sess
->queue_depth
?
964 0 : IB_SEND_SIGNALED
;
966 ib_dma_sync_single_for_device(sess
->s
.dev
->ib_dev
, req
->iu
->dma_addr
,
967 size
, DMA_TO_DEVICE
);
969 return rtrs_iu_post_rdma_write_imm(&con
->c
, req
->iu
, sge
, num_sge
,
970 rbuf
->rkey
, rbuf
->addr
, imm
,
974 static int rtrs_clt_write_req(struct rtrs_clt_io_req
*req
)
976 struct rtrs_clt_con
*con
= req
->con
;
977 struct rtrs_sess
*s
= con
->c
.sess
;
978 struct rtrs_clt_sess
*sess
= to_clt_sess(s
);
979 struct rtrs_msg_rdma_write
*msg
;
981 struct rtrs_rbuf
*rbuf
;
985 const size_t tsize
= sizeof(*msg
) + req
->data_len
+ req
->usr_len
;
987 if (unlikely(tsize
> sess
->chunk_size
)) {
988 rtrs_wrn(s
, "Write request failed, size too big %zu > %d\n",
989 tsize
, sess
->chunk_size
);
993 count
= ib_dma_map_sg(sess
->s
.dev
->ib_dev
, req
->sglist
,
994 req
->sg_cnt
, req
->dir
);
995 if (unlikely(!count
)) {
996 rtrs_wrn(s
, "Write request failed, map failed\n");
1000 /* put rtrs msg after sg and user message */
1001 msg
= req
->iu
->buf
+ req
->usr_len
;
1002 msg
->type
= cpu_to_le16(RTRS_MSG_WRITE
);
1003 msg
->usr_len
= cpu_to_le16(req
->usr_len
);
1005 /* rtrs message on server side will be after user data and message */
1006 imm
= req
->permit
->mem_off
+ req
->data_len
+ req
->usr_len
;
1007 imm
= rtrs_to_io_req_imm(imm
);
1008 buf_id
= req
->permit
->mem_id
;
1009 req
->sg_size
= tsize
;
1010 rbuf
= &sess
->rbufs
[buf_id
];
1013 * Update stats now, after request is successfully sent it is not
1014 * safe anymore to touch it.
1016 rtrs_clt_update_all_stats(req
, WRITE
);
1018 ret
= rtrs_post_rdma_write_sg(req
->con
, req
, rbuf
,
1019 req
->usr_len
+ sizeof(*msg
),
1021 if (unlikely(ret
)) {
1022 rtrs_err(s
, "Write request failed: %d\n", ret
);
1023 if (sess
->clt
->mp_policy
== MP_POLICY_MIN_INFLIGHT
)
1024 atomic_dec(&sess
->stats
->inflight
);
1026 ib_dma_unmap_sg(sess
->s
.dev
->ib_dev
, req
->sglist
,
1027 req
->sg_cnt
, req
->dir
);
1033 static int rtrs_map_sg_fr(struct rtrs_clt_io_req
*req
, size_t count
)
1037 /* Align the MR to a 4K page size to match the block virt boundary */
1038 nr
= ib_map_mr_sg(req
->mr
, req
->sglist
, count
, NULL
, SZ_4K
);
1041 if (unlikely(nr
< req
->sg_cnt
))
1043 ib_update_fast_reg_key(req
->mr
, ib_inc_rkey(req
->mr
->rkey
));
1048 static int rtrs_clt_read_req(struct rtrs_clt_io_req
*req
)
1050 struct rtrs_clt_con
*con
= req
->con
;
1051 struct rtrs_sess
*s
= con
->c
.sess
;
1052 struct rtrs_clt_sess
*sess
= to_clt_sess(s
);
1053 struct rtrs_msg_rdma_read
*msg
;
1054 struct rtrs_ib_dev
*dev
;
1056 struct ib_reg_wr rwr
;
1057 struct ib_send_wr
*wr
= NULL
;
1062 const size_t tsize
= sizeof(*msg
) + req
->data_len
+ req
->usr_len
;
1067 if (unlikely(tsize
> sess
->chunk_size
)) {
1069 "Read request failed, message size is %zu, bigger than CHUNK_SIZE %d\n",
1070 tsize
, sess
->chunk_size
);
1075 count
= ib_dma_map_sg(dev
->ib_dev
, req
->sglist
, req
->sg_cnt
,
1077 if (unlikely(!count
)) {
1079 "Read request failed, dma map failed\n");
1083 /* put our message into req->buf after user message*/
1084 msg
= req
->iu
->buf
+ req
->usr_len
;
1085 msg
->type
= cpu_to_le16(RTRS_MSG_READ
);
1086 msg
->usr_len
= cpu_to_le16(req
->usr_len
);
1089 ret
= rtrs_map_sg_fr(req
, count
);
1092 "Read request failed, failed to map fast reg. data, err: %d\n",
1094 ib_dma_unmap_sg(dev
->ib_dev
, req
->sglist
, req
->sg_cnt
,
1098 rwr
= (struct ib_reg_wr
) {
1099 .wr
.opcode
= IB_WR_REG_MR
,
1100 .wr
.wr_cqe
= &fast_reg_cqe
,
1102 .key
= req
->mr
->rkey
,
1103 .access
= (IB_ACCESS_LOCAL_WRITE
|
1104 IB_ACCESS_REMOTE_WRITE
),
1108 msg
->sg_cnt
= cpu_to_le16(1);
1109 msg
->flags
= cpu_to_le16(RTRS_MSG_NEED_INVAL_F
);
1111 msg
->desc
[0].addr
= cpu_to_le64(req
->mr
->iova
);
1112 msg
->desc
[0].key
= cpu_to_le32(req
->mr
->rkey
);
1113 msg
->desc
[0].len
= cpu_to_le32(req
->mr
->length
);
1115 /* Further invalidation is required */
1116 req
->need_inv
= !!RTRS_MSG_NEED_INVAL_F
;
1123 * rtrs message will be after the space reserved for disk data and
1126 imm
= req
->permit
->mem_off
+ req
->data_len
+ req
->usr_len
;
1127 imm
= rtrs_to_io_req_imm(imm
);
1128 buf_id
= req
->permit
->mem_id
;
1130 req
->sg_size
= sizeof(*msg
);
1131 req
->sg_size
+= le16_to_cpu(msg
->sg_cnt
) * sizeof(struct rtrs_sg_desc
);
1132 req
->sg_size
+= req
->usr_len
;
1135 * Update stats now, after request is successfully sent it is not
1136 * safe anymore to touch it.
1138 rtrs_clt_update_all_stats(req
, READ
);
1140 ret
= rtrs_post_send_rdma(req
->con
, req
, &sess
->rbufs
[buf_id
],
1141 req
->data_len
, imm
, wr
);
1142 if (unlikely(ret
)) {
1143 rtrs_err(s
, "Read request failed: %d\n", ret
);
1144 if (sess
->clt
->mp_policy
== MP_POLICY_MIN_INFLIGHT
)
1145 atomic_dec(&sess
->stats
->inflight
);
1146 req
->need_inv
= false;
1148 ib_dma_unmap_sg(dev
->ib_dev
, req
->sglist
,
1149 req
->sg_cnt
, req
->dir
);
1156 * rtrs_clt_failover_req() Try to find an active path for a failed request
1158 * @fail_req: a failed io request.
1160 static int rtrs_clt_failover_req(struct rtrs_clt
*clt
,
1161 struct rtrs_clt_io_req
*fail_req
)
1163 struct rtrs_clt_sess
*alive_sess
;
1164 struct rtrs_clt_io_req
*req
;
1165 int err
= -ECONNABORTED
;
1169 for (path_it_init(&it
, clt
);
1170 (alive_sess
= it
.next_path(&it
)) && it
.i
< it
.clt
->paths_num
;
1172 if (unlikely(READ_ONCE(alive_sess
->state
) !=
1173 RTRS_CLT_CONNECTED
))
1175 req
= rtrs_clt_get_copy_req(alive_sess
, fail_req
);
1176 if (req
->dir
== DMA_TO_DEVICE
)
1177 err
= rtrs_clt_write_req(req
);
1179 err
= rtrs_clt_read_req(req
);
1180 if (unlikely(err
)) {
1181 req
->in_use
= false;
1185 rtrs_clt_inc_failover_cnt(alive_sess
->stats
);
1188 path_it_deinit(&it
);
1194 static void fail_all_outstanding_reqs(struct rtrs_clt_sess
*sess
)
1196 struct rtrs_clt
*clt
= sess
->clt
;
1197 struct rtrs_clt_io_req
*req
;
1202 for (i
= 0; i
< sess
->queue_depth
; ++i
) {
1203 req
= &sess
->reqs
[i
];
1208 * Safely (without notification) complete failed request.
1209 * After completion this request is still useble and can
1210 * be failovered to another path.
1212 complete_rdma_req(req
, -ECONNABORTED
, false, true);
1214 err
= rtrs_clt_failover_req(clt
, req
);
1216 /* Failover failed, notify anyway */
1217 req
->conf(req
->priv
, err
);
1221 static void free_sess_reqs(struct rtrs_clt_sess
*sess
)
1223 struct rtrs_clt_io_req
*req
;
1228 for (i
= 0; i
< sess
->queue_depth
; ++i
) {
1229 req
= &sess
->reqs
[i
];
1231 ib_dereg_mr(req
->mr
);
1233 rtrs_iu_free(req
->iu
, sess
->s
.dev
->ib_dev
, 1);
1239 static int alloc_sess_reqs(struct rtrs_clt_sess
*sess
)
1241 struct rtrs_clt_io_req
*req
;
1242 struct rtrs_clt
*clt
= sess
->clt
;
1243 int i
, err
= -ENOMEM
;
1245 sess
->reqs
= kcalloc(sess
->queue_depth
, sizeof(*sess
->reqs
),
1250 for (i
= 0; i
< sess
->queue_depth
; ++i
) {
1251 req
= &sess
->reqs
[i
];
1252 req
->iu
= rtrs_iu_alloc(1, sess
->max_hdr_size
, GFP_KERNEL
,
1253 sess
->s
.dev
->ib_dev
,
1255 rtrs_clt_rdma_done
);
1259 req
->sge
= kmalloc_array(clt
->max_segments
+ 1,
1260 sizeof(*req
->sge
), GFP_KERNEL
);
1264 req
->mr
= ib_alloc_mr(sess
->s
.dev
->ib_pd
, IB_MR_TYPE_MEM_REG
,
1265 sess
->max_pages_per_mr
);
1266 if (IS_ERR(req
->mr
)) {
1267 err
= PTR_ERR(req
->mr
);
1269 pr_err("Failed to alloc sess->max_pages_per_mr %d\n",
1270 sess
->max_pages_per_mr
);
1274 init_completion(&req
->inv_comp
);
1280 free_sess_reqs(sess
);
1285 static int alloc_permits(struct rtrs_clt
*clt
)
1287 unsigned int chunk_bits
;
1290 clt
->permits_map
= kcalloc(BITS_TO_LONGS(clt
->queue_depth
),
1291 sizeof(long), GFP_KERNEL
);
1292 if (!clt
->permits_map
) {
1296 clt
->permits
= kcalloc(clt
->queue_depth
, permit_size(clt
), GFP_KERNEL
);
1297 if (!clt
->permits
) {
1301 chunk_bits
= ilog2(clt
->queue_depth
- 1) + 1;
1302 for (i
= 0; i
< clt
->queue_depth
; i
++) {
1303 struct rtrs_permit
*permit
;
1305 permit
= get_permit(clt
, i
);
1307 permit
->mem_off
= i
<< (MAX_IMM_PAYL_BITS
- chunk_bits
);
1313 kfree(clt
->permits_map
);
1314 clt
->permits_map
= NULL
;
1319 static void free_permits(struct rtrs_clt
*clt
)
1321 kfree(clt
->permits_map
);
1322 clt
->permits_map
= NULL
;
1323 kfree(clt
->permits
);
1324 clt
->permits
= NULL
;
1327 static void query_fast_reg_mode(struct rtrs_clt_sess
*sess
)
1329 struct ib_device
*ib_dev
;
1330 u64 max_pages_per_mr
;
1333 ib_dev
= sess
->s
.dev
->ib_dev
;
1336 * Use the smallest page size supported by the HCA, down to a
1337 * minimum of 4096 bytes. We're unlikely to build large sglists
1338 * out of smaller entries.
1340 mr_page_shift
= max(12, ffs(ib_dev
->attrs
.page_size_cap
) - 1);
1341 max_pages_per_mr
= ib_dev
->attrs
.max_mr_size
;
1342 do_div(max_pages_per_mr
, (1ull << mr_page_shift
));
1343 sess
->max_pages_per_mr
=
1344 min3(sess
->max_pages_per_mr
, (u32
)max_pages_per_mr
,
1345 ib_dev
->attrs
.max_fast_reg_page_list_len
);
1346 sess
->max_send_sge
= ib_dev
->attrs
.max_send_sge
;
1349 static bool rtrs_clt_change_state_get_old(struct rtrs_clt_sess
*sess
,
1350 enum rtrs_clt_state new_state
,
1351 enum rtrs_clt_state
*old_state
)
1355 spin_lock_irq(&sess
->state_wq
.lock
);
1356 *old_state
= sess
->state
;
1357 changed
= __rtrs_clt_change_state(sess
, new_state
);
1358 spin_unlock_irq(&sess
->state_wq
.lock
);
1363 static bool rtrs_clt_change_state(struct rtrs_clt_sess
*sess
,
1364 enum rtrs_clt_state new_state
)
1366 enum rtrs_clt_state old_state
;
1368 return rtrs_clt_change_state_get_old(sess
, new_state
, &old_state
);
1371 static void rtrs_clt_hb_err_handler(struct rtrs_con
*c
)
1373 struct rtrs_clt_con
*con
= container_of(c
, typeof(*con
), c
);
1375 rtrs_rdma_error_recovery(con
);
1378 static void rtrs_clt_init_hb(struct rtrs_clt_sess
*sess
)
1380 rtrs_init_hb(&sess
->s
, &io_comp_cqe
,
1381 RTRS_HB_INTERVAL_MS
,
1383 rtrs_clt_hb_err_handler
,
1387 static void rtrs_clt_start_hb(struct rtrs_clt_sess
*sess
)
1389 rtrs_start_hb(&sess
->s
);
1392 static void rtrs_clt_stop_hb(struct rtrs_clt_sess
*sess
)
1394 rtrs_stop_hb(&sess
->s
);
1397 static void rtrs_clt_reconnect_work(struct work_struct
*work
);
1398 static void rtrs_clt_close_work(struct work_struct
*work
);
1400 static struct rtrs_clt_sess
*alloc_sess(struct rtrs_clt
*clt
,
1401 const struct rtrs_addr
*path
,
1402 size_t con_num
, u16 max_segments
,
1403 size_t max_segment_size
)
1405 struct rtrs_clt_sess
*sess
;
1409 sess
= kzalloc(sizeof(*sess
), GFP_KERNEL
);
1413 /* Extra connection for user messages */
1416 sess
->s
.con
= kcalloc(con_num
, sizeof(*sess
->s
.con
), GFP_KERNEL
);
1420 sess
->stats
= kzalloc(sizeof(*sess
->stats
), GFP_KERNEL
);
1424 mutex_init(&sess
->init_mutex
);
1425 uuid_gen(&sess
->s
.uuid
);
1426 memcpy(&sess
->s
.dst_addr
, path
->dst
,
1427 rdma_addr_size((struct sockaddr
*)path
->dst
));
1430 * rdma_resolve_addr() passes src_addr to cma_bind_addr, which
1431 * checks the sa_family to be non-zero. If user passed src_addr=NULL
1432 * the sess->src_addr will contain only zeros, which is then fine.
1435 memcpy(&sess
->s
.src_addr
, path
->src
,
1436 rdma_addr_size((struct sockaddr
*)path
->src
));
1437 strlcpy(sess
->s
.sessname
, clt
->sessname
, sizeof(sess
->s
.sessname
));
1438 sess
->s
.con_num
= con_num
;
1440 sess
->max_pages_per_mr
= max_segments
* max_segment_size
>> 12;
1441 init_waitqueue_head(&sess
->state_wq
);
1442 sess
->state
= RTRS_CLT_CONNECTING
;
1443 atomic_set(&sess
->connected_cnt
, 0);
1444 INIT_WORK(&sess
->close_work
, rtrs_clt_close_work
);
1445 INIT_DELAYED_WORK(&sess
->reconnect_dwork
, rtrs_clt_reconnect_work
);
1446 rtrs_clt_init_hb(sess
);
1448 sess
->mp_skip_entry
= alloc_percpu(typeof(*sess
->mp_skip_entry
));
1449 if (!sess
->mp_skip_entry
)
1450 goto err_free_stats
;
1452 for_each_possible_cpu(cpu
)
1453 INIT_LIST_HEAD(per_cpu_ptr(sess
->mp_skip_entry
, cpu
));
1455 err
= rtrs_clt_init_stats(sess
->stats
);
1457 goto err_free_percpu
;
1462 free_percpu(sess
->mp_skip_entry
);
1470 return ERR_PTR(err
);
1473 void free_sess(struct rtrs_clt_sess
*sess
)
1475 free_percpu(sess
->mp_skip_entry
);
1476 mutex_destroy(&sess
->init_mutex
);
1482 static int create_con(struct rtrs_clt_sess
*sess
, unsigned int cid
)
1484 struct rtrs_clt_con
*con
;
1486 con
= kzalloc(sizeof(*con
), GFP_KERNEL
);
1490 /* Map first two connections to the first CPU */
1491 con
->cpu
= (cid
? cid
- 1 : 0) % nr_cpu_ids
;
1493 con
->c
.sess
= &sess
->s
;
1494 atomic_set(&con
->io_cnt
, 0);
1495 mutex_init(&con
->con_mutex
);
1497 sess
->s
.con
[cid
] = &con
->c
;
1502 static void destroy_con(struct rtrs_clt_con
*con
)
1504 struct rtrs_clt_sess
*sess
= to_clt_sess(con
->c
.sess
);
1506 sess
->s
.con
[con
->c
.cid
] = NULL
;
1507 mutex_destroy(&con
->con_mutex
);
1511 static int create_con_cq_qp(struct rtrs_clt_con
*con
)
1513 struct rtrs_clt_sess
*sess
= to_clt_sess(con
->c
.sess
);
1516 struct rtrs_msg_rkey_rsp
*rsp
;
1518 lockdep_assert_held(&con
->con_mutex
);
1519 if (con
->c
.cid
== 0) {
1521 * One completion for each receive and two for each send
1522 * (send request + registration)
1523 * + 2 for drain and heartbeat
1524 * in case qp gets into error state
1526 wr_queue_size
= SERVICE_CON_QUEUE_DEPTH
* 3 + 2;
1527 /* We must be the first here */
1528 if (WARN_ON(sess
->s
.dev
))
1532 * The whole session uses device from user connection.
1533 * Be careful not to close user connection before ib dev
1534 * is gracefully put.
1536 sess
->s
.dev
= rtrs_ib_dev_find_or_add(con
->c
.cm_id
->device
,
1540 "rtrs_ib_dev_find_get_or_add(): no memory\n");
1543 sess
->s
.dev_ref
= 1;
1544 query_fast_reg_mode(sess
);
1547 * Here we assume that session members are correctly set.
1548 * This is always true if user connection (cid == 0) is
1549 * established first.
1551 if (WARN_ON(!sess
->s
.dev
))
1553 if (WARN_ON(!sess
->queue_depth
))
1556 /* Shared between connections */
1559 min_t(int, sess
->s
.dev
->ib_dev
->attrs
.max_qp_wr
,
1560 /* QD * (REQ + RSP + FR REGS or INVS) + drain */
1561 sess
->queue_depth
* 3 + 1);
1563 /* alloc iu to recv new rkey reply when server reports flags set */
1564 if (sess
->flags
== RTRS_MSG_NEW_RKEY_F
|| con
->c
.cid
== 0) {
1565 con
->rsp_ius
= rtrs_iu_alloc(wr_queue_size
, sizeof(*rsp
),
1566 GFP_KERNEL
, sess
->s
.dev
->ib_dev
,
1568 rtrs_clt_rdma_done
);
1571 con
->queue_size
= wr_queue_size
;
1573 cq_vector
= con
->cpu
% sess
->s
.dev
->ib_dev
->num_comp_vectors
;
1574 err
= rtrs_cq_qp_create(&sess
->s
, &con
->c
, sess
->max_send_sge
,
1575 cq_vector
, wr_queue_size
, wr_queue_size
,
1578 * In case of error we do not bother to clean previous allocations,
1579 * since destroy_con_cq_qp() must be called.
1584 static void destroy_con_cq_qp(struct rtrs_clt_con
*con
)
1586 struct rtrs_clt_sess
*sess
= to_clt_sess(con
->c
.sess
);
1589 * Be careful here: destroy_con_cq_qp() can be called even
1590 * create_con_cq_qp() failed, see comments there.
1592 lockdep_assert_held(&con
->con_mutex
);
1593 rtrs_cq_qp_destroy(&con
->c
);
1595 rtrs_iu_free(con
->rsp_ius
, sess
->s
.dev
->ib_dev
, con
->queue_size
);
1596 con
->rsp_ius
= NULL
;
1597 con
->queue_size
= 0;
1599 if (sess
->s
.dev_ref
&& !--sess
->s
.dev_ref
) {
1600 rtrs_ib_dev_put(sess
->s
.dev
);
1605 static void stop_cm(struct rtrs_clt_con
*con
)
1607 rdma_disconnect(con
->c
.cm_id
);
1609 ib_drain_qp(con
->c
.qp
);
1612 static void destroy_cm(struct rtrs_clt_con
*con
)
1614 rdma_destroy_id(con
->c
.cm_id
);
1615 con
->c
.cm_id
= NULL
;
1618 static int rtrs_rdma_addr_resolved(struct rtrs_clt_con
*con
)
1620 struct rtrs_sess
*s
= con
->c
.sess
;
1623 mutex_lock(&con
->con_mutex
);
1624 err
= create_con_cq_qp(con
);
1625 mutex_unlock(&con
->con_mutex
);
1627 rtrs_err(s
, "create_con_cq_qp(), err: %d\n", err
);
1630 err
= rdma_resolve_route(con
->c
.cm_id
, RTRS_CONNECT_TIMEOUT_MS
);
1632 rtrs_err(s
, "Resolving route failed, err: %d\n", err
);
1637 static int rtrs_rdma_route_resolved(struct rtrs_clt_con
*con
)
1639 struct rtrs_clt_sess
*sess
= to_clt_sess(con
->c
.sess
);
1640 struct rtrs_clt
*clt
= sess
->clt
;
1641 struct rtrs_msg_conn_req msg
;
1642 struct rdma_conn_param param
;
1646 param
= (struct rdma_conn_param
) {
1648 .rnr_retry_count
= 7,
1649 .private_data
= &msg
,
1650 .private_data_len
= sizeof(msg
),
1653 msg
= (struct rtrs_msg_conn_req
) {
1654 .magic
= cpu_to_le16(RTRS_MAGIC
),
1655 .version
= cpu_to_le16(RTRS_PROTO_VER
),
1656 .cid
= cpu_to_le16(con
->c
.cid
),
1657 .cid_num
= cpu_to_le16(sess
->s
.con_num
),
1658 .recon_cnt
= cpu_to_le16(sess
->s
.recon_cnt
),
1660 uuid_copy(&msg
.sess_uuid
, &sess
->s
.uuid
);
1661 uuid_copy(&msg
.paths_uuid
, &clt
->paths_uuid
);
1663 err
= rdma_connect_locked(con
->c
.cm_id
, ¶m
);
1665 rtrs_err(clt
, "rdma_connect_locked(): %d\n", err
);
1670 static int rtrs_rdma_conn_established(struct rtrs_clt_con
*con
,
1671 struct rdma_cm_event
*ev
)
1673 struct rtrs_clt_sess
*sess
= to_clt_sess(con
->c
.sess
);
1674 struct rtrs_clt
*clt
= sess
->clt
;
1675 const struct rtrs_msg_conn_rsp
*msg
;
1676 u16 version
, queue_depth
;
1680 msg
= ev
->param
.conn
.private_data
;
1681 len
= ev
->param
.conn
.private_data_len
;
1682 if (len
< sizeof(*msg
)) {
1683 rtrs_err(clt
, "Invalid RTRS connection response\n");
1686 if (le16_to_cpu(msg
->magic
) != RTRS_MAGIC
) {
1687 rtrs_err(clt
, "Invalid RTRS magic\n");
1690 version
= le16_to_cpu(msg
->version
);
1691 if (version
>> 8 != RTRS_PROTO_VER_MAJOR
) {
1692 rtrs_err(clt
, "Unsupported major RTRS version: %d, expected %d\n",
1693 version
>> 8, RTRS_PROTO_VER_MAJOR
);
1696 errno
= le16_to_cpu(msg
->errno
);
1698 rtrs_err(clt
, "Invalid RTRS message: errno %d\n",
1702 if (con
->c
.cid
== 0) {
1703 queue_depth
= le16_to_cpu(msg
->queue_depth
);
1705 if (queue_depth
> MAX_SESS_QUEUE_DEPTH
) {
1706 rtrs_err(clt
, "Invalid RTRS message: queue=%d\n",
1710 if (!sess
->rbufs
|| sess
->queue_depth
< queue_depth
) {
1712 sess
->rbufs
= kcalloc(queue_depth
, sizeof(*sess
->rbufs
),
1717 sess
->queue_depth
= queue_depth
;
1718 sess
->max_hdr_size
= le32_to_cpu(msg
->max_hdr_size
);
1719 sess
->max_io_size
= le32_to_cpu(msg
->max_io_size
);
1720 sess
->flags
= le32_to_cpu(msg
->flags
);
1721 sess
->chunk_size
= sess
->max_io_size
+ sess
->max_hdr_size
;
1724 * Global queue depth and IO size is always a minimum.
1725 * If while a reconnection server sends us a value a bit
1726 * higher - client does not care and uses cached minimum.
1728 * Since we can have several sessions (paths) restablishing
1729 * connections in parallel, use lock.
1731 mutex_lock(&clt
->paths_mutex
);
1732 clt
->queue_depth
= min_not_zero(sess
->queue_depth
,
1734 clt
->max_io_size
= min_not_zero(sess
->max_io_size
,
1736 mutex_unlock(&clt
->paths_mutex
);
1739 * Cache the hca_port and hca_name for sysfs
1741 sess
->hca_port
= con
->c
.cm_id
->port_num
;
1742 scnprintf(sess
->hca_name
, sizeof(sess
->hca_name
),
1743 sess
->s
.dev
->ib_dev
->name
);
1744 sess
->s
.src_addr
= con
->c
.cm_id
->route
.addr
.src_addr
;
1750 static inline void flag_success_on_conn(struct rtrs_clt_con
*con
)
1752 struct rtrs_clt_sess
*sess
= to_clt_sess(con
->c
.sess
);
1754 atomic_inc(&sess
->connected_cnt
);
1758 static int rtrs_rdma_conn_rejected(struct rtrs_clt_con
*con
,
1759 struct rdma_cm_event
*ev
)
1761 struct rtrs_sess
*s
= con
->c
.sess
;
1762 const struct rtrs_msg_conn_rsp
*msg
;
1763 const char *rej_msg
;
1767 status
= ev
->status
;
1768 rej_msg
= rdma_reject_msg(con
->c
.cm_id
, status
);
1769 msg
= rdma_consumer_reject_data(con
->c
.cm_id
, ev
, &data_len
);
1771 if (msg
&& data_len
>= sizeof(*msg
)) {
1772 errno
= (int16_t)le16_to_cpu(msg
->errno
);
1773 if (errno
== -EBUSY
)
1775 "Previous session is still exists on the server, please reconnect later\n");
1778 "Connect rejected: status %d (%s), rtrs errno %d\n",
1779 status
, rej_msg
, errno
);
1782 "Connect rejected but with malformed message: status %d (%s)\n",
1789 static void rtrs_clt_close_conns(struct rtrs_clt_sess
*sess
, bool wait
)
1791 if (rtrs_clt_change_state(sess
, RTRS_CLT_CLOSING
))
1792 queue_work(rtrs_wq
, &sess
->close_work
);
1794 flush_work(&sess
->close_work
);
1797 static inline void flag_error_on_conn(struct rtrs_clt_con
*con
, int cm_err
)
1799 if (con
->cm_err
== 1) {
1800 struct rtrs_clt_sess
*sess
;
1802 sess
= to_clt_sess(con
->c
.sess
);
1803 if (atomic_dec_and_test(&sess
->connected_cnt
))
1805 wake_up(&sess
->state_wq
);
1807 con
->cm_err
= cm_err
;
1810 static int rtrs_clt_rdma_cm_handler(struct rdma_cm_id
*cm_id
,
1811 struct rdma_cm_event
*ev
)
1813 struct rtrs_clt_con
*con
= cm_id
->context
;
1814 struct rtrs_sess
*s
= con
->c
.sess
;
1815 struct rtrs_clt_sess
*sess
= to_clt_sess(s
);
1818 switch (ev
->event
) {
1819 case RDMA_CM_EVENT_ADDR_RESOLVED
:
1820 cm_err
= rtrs_rdma_addr_resolved(con
);
1822 case RDMA_CM_EVENT_ROUTE_RESOLVED
:
1823 cm_err
= rtrs_rdma_route_resolved(con
);
1825 case RDMA_CM_EVENT_ESTABLISHED
:
1826 cm_err
= rtrs_rdma_conn_established(con
, ev
);
1827 if (likely(!cm_err
)) {
1829 * Report success and wake up. Here we abuse state_wq,
1830 * i.e. wake up without state change, but we set cm_err.
1832 flag_success_on_conn(con
);
1833 wake_up(&sess
->state_wq
);
1837 case RDMA_CM_EVENT_REJECTED
:
1838 cm_err
= rtrs_rdma_conn_rejected(con
, ev
);
1840 case RDMA_CM_EVENT_DISCONNECTED
:
1841 /* No message for disconnecting */
1842 cm_err
= -ECONNRESET
;
1844 case RDMA_CM_EVENT_CONNECT_ERROR
:
1845 case RDMA_CM_EVENT_UNREACHABLE
:
1846 case RDMA_CM_EVENT_ADDR_CHANGE
:
1847 case RDMA_CM_EVENT_TIMEWAIT_EXIT
:
1848 rtrs_wrn(s
, "CM error event %d\n", ev
->event
);
1849 cm_err
= -ECONNRESET
;
1851 case RDMA_CM_EVENT_ADDR_ERROR
:
1852 case RDMA_CM_EVENT_ROUTE_ERROR
:
1853 rtrs_wrn(s
, "CM error event %d\n", ev
->event
);
1854 cm_err
= -EHOSTUNREACH
;
1856 case RDMA_CM_EVENT_DEVICE_REMOVAL
:
1858 * Device removal is a special case. Queue close and return 0.
1860 rtrs_clt_close_conns(sess
, false);
1863 rtrs_err(s
, "Unexpected RDMA CM event (%d)\n", ev
->event
);
1864 cm_err
= -ECONNRESET
;
1870 * cm error makes sense only on connection establishing,
1871 * in other cases we rely on normal procedure of reconnecting.
1873 flag_error_on_conn(con
, cm_err
);
1874 rtrs_rdma_error_recovery(con
);
1880 static int create_cm(struct rtrs_clt_con
*con
)
1882 struct rtrs_sess
*s
= con
->c
.sess
;
1883 struct rtrs_clt_sess
*sess
= to_clt_sess(s
);
1884 struct rdma_cm_id
*cm_id
;
1887 cm_id
= rdma_create_id(&init_net
, rtrs_clt_rdma_cm_handler
, con
,
1888 sess
->s
.dst_addr
.ss_family
== AF_IB
?
1889 RDMA_PS_IB
: RDMA_PS_TCP
, IB_QPT_RC
);
1890 if (IS_ERR(cm_id
)) {
1891 err
= PTR_ERR(cm_id
);
1892 rtrs_err(s
, "Failed to create CM ID, err: %d\n", err
);
1896 con
->c
.cm_id
= cm_id
;
1898 /* allow the port to be reused */
1899 err
= rdma_set_reuseaddr(cm_id
, 1);
1901 rtrs_err(s
, "Set address reuse failed, err: %d\n", err
);
1904 err
= rdma_resolve_addr(cm_id
, (struct sockaddr
*)&sess
->s
.src_addr
,
1905 (struct sockaddr
*)&sess
->s
.dst_addr
,
1906 RTRS_CONNECT_TIMEOUT_MS
);
1908 rtrs_err(s
, "Failed to resolve address, err: %d\n", err
);
1912 * Combine connection status and session events. This is needed
1913 * for waiting two possible cases: cm_err has something meaningful
1914 * or session state was really changed to error by device removal.
1916 err
= wait_event_interruptible_timeout(
1918 con
->cm_err
|| sess
->state
!= RTRS_CLT_CONNECTING
,
1919 msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS
));
1920 if (err
== 0 || err
== -ERESTARTSYS
) {
1923 /* Timedout or interrupted */
1926 if (con
->cm_err
< 0) {
1930 if (READ_ONCE(sess
->state
) != RTRS_CLT_CONNECTING
) {
1931 /* Device removal */
1932 err
= -ECONNABORTED
;
1940 mutex_lock(&con
->con_mutex
);
1941 destroy_con_cq_qp(con
);
1942 mutex_unlock(&con
->con_mutex
);
1949 static void rtrs_clt_sess_up(struct rtrs_clt_sess
*sess
)
1951 struct rtrs_clt
*clt
= sess
->clt
;
1955 * We can fire RECONNECTED event only when all paths were
1956 * connected on rtrs_clt_open(), then each was disconnected
1957 * and the first one connected again. That's why this nasty
1958 * game with counter value.
1961 mutex_lock(&clt
->paths_ev_mutex
);
1962 up
= ++clt
->paths_up
;
1964 * Here it is safe to access paths num directly since up counter
1965 * is greater than MAX_PATHS_NUM only while rtrs_clt_open() is
1966 * in progress, thus paths removals are impossible.
1968 if (up
> MAX_PATHS_NUM
&& up
== MAX_PATHS_NUM
+ clt
->paths_num
)
1969 clt
->paths_up
= clt
->paths_num
;
1971 clt
->link_ev(clt
->priv
, RTRS_CLT_LINK_EV_RECONNECTED
);
1972 mutex_unlock(&clt
->paths_ev_mutex
);
1974 /* Mark session as established */
1975 sess
->established
= true;
1976 sess
->reconnect_attempts
= 0;
1977 sess
->stats
->reconnects
.successful_cnt
++;
1980 static void rtrs_clt_sess_down(struct rtrs_clt_sess
*sess
)
1982 struct rtrs_clt
*clt
= sess
->clt
;
1984 if (!sess
->established
)
1987 sess
->established
= false;
1988 mutex_lock(&clt
->paths_ev_mutex
);
1989 WARN_ON(!clt
->paths_up
);
1990 if (--clt
->paths_up
== 0)
1991 clt
->link_ev(clt
->priv
, RTRS_CLT_LINK_EV_DISCONNECTED
);
1992 mutex_unlock(&clt
->paths_ev_mutex
);
1995 static void rtrs_clt_stop_and_destroy_conns(struct rtrs_clt_sess
*sess
)
1997 struct rtrs_clt_con
*con
;
2000 WARN_ON(READ_ONCE(sess
->state
) == RTRS_CLT_CONNECTED
);
2003 * Possible race with rtrs_clt_open(), when DEVICE_REMOVAL comes
2004 * exactly in between. Start destroying after it finishes.
2006 mutex_lock(&sess
->init_mutex
);
2007 mutex_unlock(&sess
->init_mutex
);
2010 * All IO paths must observe !CONNECTED state before we
2015 rtrs_clt_stop_hb(sess
);
2018 * The order it utterly crucial: firstly disconnect and complete all
2019 * rdma requests with error (thus set in_use=false for requests),
2020 * then fail outstanding requests checking in_use for each, and
2021 * eventually notify upper layer about session disconnection.
2024 for (cid
= 0; cid
< sess
->s
.con_num
; cid
++) {
2025 if (!sess
->s
.con
[cid
])
2027 con
= to_clt_con(sess
->s
.con
[cid
]);
2030 fail_all_outstanding_reqs(sess
);
2031 free_sess_reqs(sess
);
2032 rtrs_clt_sess_down(sess
);
2035 * Wait for graceful shutdown, namely when peer side invokes
2036 * rdma_disconnect(). 'connected_cnt' is decremented only on
2037 * CM events, thus if other side had crashed and hb has detected
2038 * something is wrong, here we will stuck for exactly timeout ms,
2039 * since CM does not fire anything. That is fine, we are not in
2042 wait_event_timeout(sess
->state_wq
, !atomic_read(&sess
->connected_cnt
),
2043 msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS
));
2045 for (cid
= 0; cid
< sess
->s
.con_num
; cid
++) {
2046 if (!sess
->s
.con
[cid
])
2048 con
= to_clt_con(sess
->s
.con
[cid
]);
2049 mutex_lock(&con
->con_mutex
);
2050 destroy_con_cq_qp(con
);
2051 mutex_unlock(&con
->con_mutex
);
2057 static inline bool xchg_sessions(struct rtrs_clt_sess __rcu
**rcu_ppcpu_path
,
2058 struct rtrs_clt_sess
*sess
,
2059 struct rtrs_clt_sess
*next
)
2061 struct rtrs_clt_sess
**ppcpu_path
;
2063 /* Call cmpxchg() without sparse warnings */
2064 ppcpu_path
= (typeof(ppcpu_path
))rcu_ppcpu_path
;
2065 return sess
== cmpxchg(ppcpu_path
, sess
, next
);
2068 static void rtrs_clt_remove_path_from_arr(struct rtrs_clt_sess
*sess
)
2070 struct rtrs_clt
*clt
= sess
->clt
;
2071 struct rtrs_clt_sess
*next
;
2072 bool wait_for_grace
= false;
2075 mutex_lock(&clt
->paths_mutex
);
2076 list_del_rcu(&sess
->s
.entry
);
2078 /* Make sure everybody observes path removal. */
2082 * At this point nobody sees @sess in the list, but still we have
2083 * dangling pointer @pcpu_path which _can_ point to @sess. Since
2084 * nobody can observe @sess in the list, we guarantee that IO path
2085 * will not assign @sess to @pcpu_path, i.e. @pcpu_path can be equal
2086 * to @sess, but can never again become @sess.
2090 * Decrement paths number only after grace period, because
2091 * caller of do_each_path() must firstly observe list without
2092 * path and only then decremented paths number.
2094 * Otherwise there can be the following situation:
2095 * o Two paths exist and IO is coming.
2096 * o One path is removed:
2098 * do_each_path(): rtrs_clt_remove_path_from_arr():
2099 * path = get_next_path()
2100 * ^^^ list_del_rcu(path)
2101 * [!CONNECTED path] clt->paths_num--
2103 * load clt->paths_num from 2 to 1
2107 * path is observed as !CONNECTED, but do_each_path() loop
2108 * ends, because expression i < clt->paths_num is false.
2113 * Get @next connection from current @sess which is going to be
2114 * removed. If @sess is the last element, then @next is NULL.
2117 next
= list_next_or_null_rr_rcu(&clt
->paths_list
, &sess
->s
.entry
,
2118 typeof(*next
), s
.entry
);
2122 * @pcpu paths can still point to the path which is going to be
2123 * removed, so change the pointer manually.
2125 for_each_possible_cpu(cpu
) {
2126 struct rtrs_clt_sess __rcu
**ppcpu_path
;
2128 ppcpu_path
= per_cpu_ptr(clt
->pcpu_path
, cpu
);
2129 if (rcu_dereference_protected(*ppcpu_path
,
2130 lockdep_is_held(&clt
->paths_mutex
)) != sess
)
2132 * synchronize_rcu() was called just after deleting
2133 * entry from the list, thus IO code path cannot
2134 * change pointer back to the pointer which is going
2135 * to be removed, we are safe here.
2140 * We race with IO code path, which also changes pointer,
2141 * thus we have to be careful not to overwrite it.
2143 if (xchg_sessions(ppcpu_path
, sess
, next
))
2145 * @ppcpu_path was successfully replaced with @next,
2146 * that means that someone could also pick up the
2147 * @sess and dereferencing it right now, so wait for
2148 * a grace period is required.
2150 wait_for_grace
= true;
2155 mutex_unlock(&clt
->paths_mutex
);
2158 static void rtrs_clt_add_path_to_arr(struct rtrs_clt_sess
*sess
)
2160 struct rtrs_clt
*clt
= sess
->clt
;
2162 mutex_lock(&clt
->paths_mutex
);
2165 list_add_tail_rcu(&sess
->s
.entry
, &clt
->paths_list
);
2166 mutex_unlock(&clt
->paths_mutex
);
2169 static void rtrs_clt_close_work(struct work_struct
*work
)
2171 struct rtrs_clt_sess
*sess
;
2173 sess
= container_of(work
, struct rtrs_clt_sess
, close_work
);
2175 cancel_delayed_work_sync(&sess
->reconnect_dwork
);
2176 rtrs_clt_stop_and_destroy_conns(sess
);
2177 rtrs_clt_change_state(sess
, RTRS_CLT_CLOSED
);
2180 static int init_conns(struct rtrs_clt_sess
*sess
)
2186 * On every new session connections increase reconnect counter
2187 * to avoid clashes with previous sessions not yet closed
2188 * sessions on a server side.
2190 sess
->s
.recon_cnt
++;
2192 /* Establish all RDMA connections */
2193 for (cid
= 0; cid
< sess
->s
.con_num
; cid
++) {
2194 err
= create_con(sess
, cid
);
2198 err
= create_cm(to_clt_con(sess
->s
.con
[cid
]));
2200 destroy_con(to_clt_con(sess
->s
.con
[cid
]));
2204 err
= alloc_sess_reqs(sess
);
2208 rtrs_clt_start_hb(sess
);
2214 struct rtrs_clt_con
*con
= to_clt_con(sess
->s
.con
[cid
]);
2218 mutex_lock(&con
->con_mutex
);
2219 destroy_con_cq_qp(con
);
2220 mutex_unlock(&con
->con_mutex
);
2225 * If we've never taken async path and got an error, say,
2226 * doing rdma_resolve_addr(), switch to CONNECTION_ERR state
2227 * manually to keep reconnecting.
2229 rtrs_clt_change_state(sess
, RTRS_CLT_CONNECTING_ERR
);
2234 static void rtrs_clt_info_req_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
2236 struct rtrs_clt_con
*con
= cq
->cq_context
;
2237 struct rtrs_clt_sess
*sess
= to_clt_sess(con
->c
.sess
);
2240 iu
= container_of(wc
->wr_cqe
, struct rtrs_iu
, cqe
);
2241 rtrs_iu_free(iu
, sess
->s
.dev
->ib_dev
, 1);
2243 if (unlikely(wc
->status
!= IB_WC_SUCCESS
)) {
2244 rtrs_err(sess
->clt
, "Sess info request send failed: %s\n",
2245 ib_wc_status_msg(wc
->status
));
2246 rtrs_clt_change_state(sess
, RTRS_CLT_CONNECTING_ERR
);
2250 rtrs_clt_update_wc_stats(con
);
2253 static int process_info_rsp(struct rtrs_clt_sess
*sess
,
2254 const struct rtrs_msg_info_rsp
*msg
)
2256 unsigned int sg_cnt
, total_len
;
2259 sg_cnt
= le16_to_cpu(msg
->sg_cnt
);
2260 if (unlikely(!sg_cnt
|| (sess
->queue_depth
% sg_cnt
))) {
2261 rtrs_err(sess
->clt
, "Incorrect sg_cnt %d, is not multiple\n",
2267 * Check if IB immediate data size is enough to hold the mem_id and
2268 * the offset inside the memory chunk.
2270 if (unlikely((ilog2(sg_cnt
- 1) + 1) +
2271 (ilog2(sess
->chunk_size
- 1) + 1) >
2272 MAX_IMM_PAYL_BITS
)) {
2274 "RDMA immediate size (%db) not enough to encode %d buffers of size %dB\n",
2275 MAX_IMM_PAYL_BITS
, sg_cnt
, sess
->chunk_size
);
2279 for (sgi
= 0, i
= 0; sgi
< sg_cnt
&& i
< sess
->queue_depth
; sgi
++) {
2280 const struct rtrs_sg_desc
*desc
= &msg
->desc
[sgi
];
2284 addr
= le64_to_cpu(desc
->addr
);
2285 rkey
= le32_to_cpu(desc
->key
);
2286 len
= le32_to_cpu(desc
->len
);
2290 if (unlikely(!len
|| (len
% sess
->chunk_size
))) {
2291 rtrs_err(sess
->clt
, "Incorrect [%d].len %d\n", sgi
,
2295 for ( ; len
&& i
< sess
->queue_depth
; i
++) {
2296 sess
->rbufs
[i
].addr
= addr
;
2297 sess
->rbufs
[i
].rkey
= rkey
;
2299 len
-= sess
->chunk_size
;
2300 addr
+= sess
->chunk_size
;
2304 if (unlikely(sgi
!= sg_cnt
|| i
!= sess
->queue_depth
)) {
2305 rtrs_err(sess
->clt
, "Incorrect sg vector, not fully mapped\n");
2308 if (unlikely(total_len
!= sess
->chunk_size
* sess
->queue_depth
)) {
2309 rtrs_err(sess
->clt
, "Incorrect total_len %d\n", total_len
);
2316 static void rtrs_clt_info_rsp_done(struct ib_cq
*cq
, struct ib_wc
*wc
)
2318 struct rtrs_clt_con
*con
= cq
->cq_context
;
2319 struct rtrs_clt_sess
*sess
= to_clt_sess(con
->c
.sess
);
2320 struct rtrs_msg_info_rsp
*msg
;
2321 enum rtrs_clt_state state
;
2326 state
= RTRS_CLT_CONNECTING_ERR
;
2328 WARN_ON(con
->c
.cid
);
2329 iu
= container_of(wc
->wr_cqe
, struct rtrs_iu
, cqe
);
2330 if (unlikely(wc
->status
!= IB_WC_SUCCESS
)) {
2331 rtrs_err(sess
->clt
, "Sess info response recv failed: %s\n",
2332 ib_wc_status_msg(wc
->status
));
2335 WARN_ON(wc
->opcode
!= IB_WC_RECV
);
2337 if (unlikely(wc
->byte_len
< sizeof(*msg
))) {
2338 rtrs_err(sess
->clt
, "Sess info response is malformed: size %d\n",
2342 ib_dma_sync_single_for_cpu(sess
->s
.dev
->ib_dev
, iu
->dma_addr
,
2343 iu
->size
, DMA_FROM_DEVICE
);
2345 if (unlikely(le16_to_cpu(msg
->type
) != RTRS_MSG_INFO_RSP
)) {
2346 rtrs_err(sess
->clt
, "Sess info response is malformed: type %d\n",
2347 le16_to_cpu(msg
->type
));
2350 rx_sz
= sizeof(*msg
);
2351 rx_sz
+= sizeof(msg
->desc
[0]) * le16_to_cpu(msg
->sg_cnt
);
2352 if (unlikely(wc
->byte_len
< rx_sz
)) {
2353 rtrs_err(sess
->clt
, "Sess info response is malformed: size %d\n",
2357 err
= process_info_rsp(sess
, msg
);
2361 err
= post_recv_sess(sess
);
2365 state
= RTRS_CLT_CONNECTED
;
2368 rtrs_clt_update_wc_stats(con
);
2369 rtrs_iu_free(iu
, sess
->s
.dev
->ib_dev
, 1);
2370 rtrs_clt_change_state(sess
, state
);
2373 static int rtrs_send_sess_info(struct rtrs_clt_sess
*sess
)
2375 struct rtrs_clt_con
*usr_con
= to_clt_con(sess
->s
.con
[0]);
2376 struct rtrs_msg_info_req
*msg
;
2377 struct rtrs_iu
*tx_iu
, *rx_iu
;
2381 rx_sz
= sizeof(struct rtrs_msg_info_rsp
);
2382 rx_sz
+= sizeof(u64
) * MAX_SESS_QUEUE_DEPTH
;
2384 tx_iu
= rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req
), GFP_KERNEL
,
2385 sess
->s
.dev
->ib_dev
, DMA_TO_DEVICE
,
2386 rtrs_clt_info_req_done
);
2387 rx_iu
= rtrs_iu_alloc(1, rx_sz
, GFP_KERNEL
, sess
->s
.dev
->ib_dev
,
2388 DMA_FROM_DEVICE
, rtrs_clt_info_rsp_done
);
2389 if (unlikely(!tx_iu
|| !rx_iu
)) {
2393 /* Prepare for getting info response */
2394 err
= rtrs_iu_post_recv(&usr_con
->c
, rx_iu
);
2395 if (unlikely(err
)) {
2396 rtrs_err(sess
->clt
, "rtrs_iu_post_recv(), err: %d\n", err
);
2402 msg
->type
= cpu_to_le16(RTRS_MSG_INFO_REQ
);
2403 memcpy(msg
->sessname
, sess
->s
.sessname
, sizeof(msg
->sessname
));
2405 ib_dma_sync_single_for_device(sess
->s
.dev
->ib_dev
, tx_iu
->dma_addr
,
2406 tx_iu
->size
, DMA_TO_DEVICE
);
2408 /* Send info request */
2409 err
= rtrs_iu_post_send(&usr_con
->c
, tx_iu
, sizeof(*msg
), NULL
);
2410 if (unlikely(err
)) {
2411 rtrs_err(sess
->clt
, "rtrs_iu_post_send(), err: %d\n", err
);
2416 /* Wait for state change */
2417 wait_event_interruptible_timeout(sess
->state_wq
,
2418 sess
->state
!= RTRS_CLT_CONNECTING
,
2420 RTRS_CONNECT_TIMEOUT_MS
));
2421 if (unlikely(READ_ONCE(sess
->state
) != RTRS_CLT_CONNECTED
)) {
2422 if (READ_ONCE(sess
->state
) == RTRS_CLT_CONNECTING_ERR
)
2431 rtrs_iu_free(tx_iu
, sess
->s
.dev
->ib_dev
, 1);
2433 rtrs_iu_free(rx_iu
, sess
->s
.dev
->ib_dev
, 1);
2435 /* If we've never taken async path because of malloc problems */
2436 rtrs_clt_change_state(sess
, RTRS_CLT_CONNECTING_ERR
);
2442 * init_sess() - establishes all session connections and does handshake
2443 * @sess: client session.
2444 * In case of error full close or reconnect procedure should be taken,
2445 * because reconnect or close async works can be started.
2447 static int init_sess(struct rtrs_clt_sess
*sess
)
2451 mutex_lock(&sess
->init_mutex
);
2452 err
= init_conns(sess
);
2454 rtrs_err(sess
->clt
, "init_conns(), err: %d\n", err
);
2457 err
= rtrs_send_sess_info(sess
);
2459 rtrs_err(sess
->clt
, "rtrs_send_sess_info(), err: %d\n", err
);
2462 rtrs_clt_sess_up(sess
);
2464 mutex_unlock(&sess
->init_mutex
);
2469 static void rtrs_clt_reconnect_work(struct work_struct
*work
)
2471 struct rtrs_clt_sess
*sess
;
2472 struct rtrs_clt
*clt
;
2473 unsigned int delay_ms
;
2476 sess
= container_of(to_delayed_work(work
), struct rtrs_clt_sess
,
2480 if (READ_ONCE(sess
->state
) != RTRS_CLT_RECONNECTING
)
2483 if (sess
->reconnect_attempts
>= clt
->max_reconnect_attempts
) {
2484 /* Close a session completely if max attempts is reached */
2485 rtrs_clt_close_conns(sess
, false);
2488 sess
->reconnect_attempts
++;
2490 /* Stop everything */
2491 rtrs_clt_stop_and_destroy_conns(sess
);
2492 msleep(RTRS_RECONNECT_BACKOFF
);
2493 if (rtrs_clt_change_state(sess
, RTRS_CLT_CONNECTING
)) {
2494 err
= init_sess(sess
);
2496 goto reconnect_again
;
2502 if (rtrs_clt_change_state(sess
, RTRS_CLT_RECONNECTING
)) {
2503 sess
->stats
->reconnects
.fail_cnt
++;
2504 delay_ms
= clt
->reconnect_delay_sec
* 1000;
2505 queue_delayed_work(rtrs_wq
, &sess
->reconnect_dwork
,
2506 msecs_to_jiffies(delay_ms
+
2508 RTRS_RECONNECT_SEED
));
2512 static void rtrs_clt_dev_release(struct device
*dev
)
2514 struct rtrs_clt
*clt
= container_of(dev
, struct rtrs_clt
, dev
);
2519 static struct rtrs_clt
*alloc_clt(const char *sessname
, size_t paths_num
,
2520 u16 port
, size_t pdu_sz
, void *priv
,
2521 void (*link_ev
)(void *priv
,
2522 enum rtrs_clt_link_ev ev
),
2523 unsigned int max_segments
,
2524 size_t max_segment_size
,
2525 unsigned int reconnect_delay_sec
,
2526 unsigned int max_reconnect_attempts
)
2528 struct rtrs_clt
*clt
;
2531 if (!paths_num
|| paths_num
> MAX_PATHS_NUM
)
2532 return ERR_PTR(-EINVAL
);
2534 if (strlen(sessname
) >= sizeof(clt
->sessname
))
2535 return ERR_PTR(-EINVAL
);
2537 clt
= kzalloc(sizeof(*clt
), GFP_KERNEL
);
2539 return ERR_PTR(-ENOMEM
);
2541 clt
->pcpu_path
= alloc_percpu(typeof(*clt
->pcpu_path
));
2542 if (!clt
->pcpu_path
) {
2544 return ERR_PTR(-ENOMEM
);
2547 uuid_gen(&clt
->paths_uuid
);
2548 INIT_LIST_HEAD_RCU(&clt
->paths_list
);
2549 clt
->paths_num
= paths_num
;
2550 clt
->paths_up
= MAX_PATHS_NUM
;
2552 clt
->pdu_sz
= pdu_sz
;
2553 clt
->max_segments
= max_segments
;
2554 clt
->max_segment_size
= max_segment_size
;
2555 clt
->reconnect_delay_sec
= reconnect_delay_sec
;
2556 clt
->max_reconnect_attempts
= max_reconnect_attempts
;
2558 clt
->link_ev
= link_ev
;
2559 clt
->mp_policy
= MP_POLICY_MIN_INFLIGHT
;
2560 strlcpy(clt
->sessname
, sessname
, sizeof(clt
->sessname
));
2561 init_waitqueue_head(&clt
->permits_wait
);
2562 mutex_init(&clt
->paths_ev_mutex
);
2563 mutex_init(&clt
->paths_mutex
);
2565 clt
->dev
.class = rtrs_clt_dev_class
;
2566 clt
->dev
.release
= rtrs_clt_dev_release
;
2567 err
= dev_set_name(&clt
->dev
, "%s", sessname
);
2569 free_percpu(clt
->pcpu_path
);
2571 return ERR_PTR(err
);
2574 * Suppress user space notification until
2575 * sysfs files are created
2577 dev_set_uevent_suppress(&clt
->dev
, true);
2578 err
= device_register(&clt
->dev
);
2580 free_percpu(clt
->pcpu_path
);
2581 put_device(&clt
->dev
);
2582 return ERR_PTR(err
);
2585 clt
->kobj_paths
= kobject_create_and_add("paths", &clt
->dev
.kobj
);
2586 if (!clt
->kobj_paths
) {
2587 free_percpu(clt
->pcpu_path
);
2588 device_unregister(&clt
->dev
);
2591 err
= rtrs_clt_create_sysfs_root_files(clt
);
2593 free_percpu(clt
->pcpu_path
);
2594 kobject_del(clt
->kobj_paths
);
2595 kobject_put(clt
->kobj_paths
);
2596 device_unregister(&clt
->dev
);
2597 return ERR_PTR(err
);
2599 dev_set_uevent_suppress(&clt
->dev
, false);
2600 kobject_uevent(&clt
->dev
.kobj
, KOBJ_ADD
);
2605 static void wait_for_inflight_permits(struct rtrs_clt
*clt
)
2607 if (clt
->permits_map
) {
2608 size_t sz
= clt
->queue_depth
;
2610 wait_event(clt
->permits_wait
,
2611 find_first_bit(clt
->permits_map
, sz
) >= sz
);
2615 static void free_clt(struct rtrs_clt
*clt
)
2617 wait_for_inflight_permits(clt
);
2619 free_percpu(clt
->pcpu_path
);
2620 mutex_destroy(&clt
->paths_ev_mutex
);
2621 mutex_destroy(&clt
->paths_mutex
);
2622 /* release callback will free clt in last put */
2623 device_unregister(&clt
->dev
);
2627 * rtrs_clt_open() - Open a session to an RTRS server
2628 * @ops: holds the link event callback and the private pointer.
2629 * @sessname: name of the session
2630 * @paths: Paths to be established defined by their src and dst addresses
2631 * @paths_num: Number of elements in the @paths array
2632 * @port: port to be used by the RTRS session
2633 * @pdu_sz: Size of extra payload which can be accessed after permit allocation.
2634 * @reconnect_delay_sec: time between reconnect tries
2635 * @max_segments: Max. number of segments per IO request
2636 * @max_segment_size: Max. size of one segment
2637 * @max_reconnect_attempts: Number of times to reconnect on error before giving
2638 * up, 0 for * disabled, -1 for forever
2640 * Starts session establishment with the rtrs_server. The function can block
2641 * up to ~2000ms before it returns.
2643 * Return a valid pointer on success otherwise PTR_ERR.
2645 struct rtrs_clt
*rtrs_clt_open(struct rtrs_clt_ops
*ops
,
2646 const char *sessname
,
2647 const struct rtrs_addr
*paths
,
2648 size_t paths_num
, u16 port
,
2649 size_t pdu_sz
, u8 reconnect_delay_sec
,
2651 size_t max_segment_size
,
2652 s16 max_reconnect_attempts
)
2654 struct rtrs_clt_sess
*sess
, *tmp
;
2655 struct rtrs_clt
*clt
;
2658 clt
= alloc_clt(sessname
, paths_num
, port
, pdu_sz
, ops
->priv
,
2660 max_segments
, max_segment_size
, reconnect_delay_sec
,
2661 max_reconnect_attempts
);
2666 for (i
= 0; i
< paths_num
; i
++) {
2667 struct rtrs_clt_sess
*sess
;
2669 sess
= alloc_sess(clt
, &paths
[i
], nr_cpu_ids
,
2670 max_segments
, max_segment_size
);
2672 err
= PTR_ERR(sess
);
2673 goto close_all_sess
;
2675 list_add_tail_rcu(&sess
->s
.entry
, &clt
->paths_list
);
2677 err
= init_sess(sess
);
2679 list_del_rcu(&sess
->s
.entry
);
2680 rtrs_clt_close_conns(sess
, true);
2682 goto close_all_sess
;
2685 err
= rtrs_clt_create_sess_files(sess
);
2687 list_del_rcu(&sess
->s
.entry
);
2688 rtrs_clt_close_conns(sess
, true);
2690 goto close_all_sess
;
2693 err
= alloc_permits(clt
);
2695 goto close_all_sess
;
2700 list_for_each_entry_safe(sess
, tmp
, &clt
->paths_list
, s
.entry
) {
2701 rtrs_clt_destroy_sess_files(sess
, NULL
);
2702 rtrs_clt_close_conns(sess
, true);
2703 kobject_put(&sess
->kobj
);
2705 rtrs_clt_destroy_sysfs_root_files(clt
);
2706 rtrs_clt_destroy_sysfs_root_folders(clt
);
2710 return ERR_PTR(err
);
2712 EXPORT_SYMBOL(rtrs_clt_open
);
2715 * rtrs_clt_close() - Close a session
2716 * @clt: Session handle. Session is freed upon return.
2718 void rtrs_clt_close(struct rtrs_clt
*clt
)
2720 struct rtrs_clt_sess
*sess
, *tmp
;
2722 /* Firstly forbid sysfs access */
2723 rtrs_clt_destroy_sysfs_root_files(clt
);
2724 rtrs_clt_destroy_sysfs_root_folders(clt
);
2726 /* Now it is safe to iterate over all paths without locks */
2727 list_for_each_entry_safe(sess
, tmp
, &clt
->paths_list
, s
.entry
) {
2728 rtrs_clt_destroy_sess_files(sess
, NULL
);
2729 rtrs_clt_close_conns(sess
, true);
2730 kobject_put(&sess
->kobj
);
2734 EXPORT_SYMBOL(rtrs_clt_close
);
2736 int rtrs_clt_reconnect_from_sysfs(struct rtrs_clt_sess
*sess
)
2738 enum rtrs_clt_state old_state
;
2742 changed
= rtrs_clt_change_state_get_old(sess
, RTRS_CLT_RECONNECTING
,
2745 sess
->reconnect_attempts
= 0;
2746 queue_delayed_work(rtrs_wq
, &sess
->reconnect_dwork
, 0);
2748 if (changed
|| old_state
== RTRS_CLT_RECONNECTING
) {
2750 * flush_delayed_work() queues pending work for immediate
2751 * execution, so do the flush if we have queued something
2752 * right now or work is pending.
2754 flush_delayed_work(&sess
->reconnect_dwork
);
2755 err
= (READ_ONCE(sess
->state
) ==
2756 RTRS_CLT_CONNECTED
? 0 : -ENOTCONN
);
2762 int rtrs_clt_disconnect_from_sysfs(struct rtrs_clt_sess
*sess
)
2764 rtrs_clt_close_conns(sess
, true);
2769 int rtrs_clt_remove_path_from_sysfs(struct rtrs_clt_sess
*sess
,
2770 const struct attribute
*sysfs_self
)
2772 enum rtrs_clt_state old_state
;
2776 * Continue stopping path till state was changed to DEAD or
2777 * state was observed as DEAD:
2778 * 1. State was changed to DEAD - we were fast and nobody
2779 * invoked rtrs_clt_reconnect(), which can again start
2781 * 2. State was observed as DEAD - we have someone in parallel
2782 * removing the path.
2785 rtrs_clt_close_conns(sess
, true);
2786 changed
= rtrs_clt_change_state_get_old(sess
,
2789 } while (!changed
&& old_state
!= RTRS_CLT_DEAD
);
2791 if (likely(changed
)) {
2792 rtrs_clt_destroy_sess_files(sess
, sysfs_self
);
2793 rtrs_clt_remove_path_from_arr(sess
);
2794 kobject_put(&sess
->kobj
);
2800 void rtrs_clt_set_max_reconnect_attempts(struct rtrs_clt
*clt
, int value
)
2802 clt
->max_reconnect_attempts
= (unsigned int)value
;
2805 int rtrs_clt_get_max_reconnect_attempts(const struct rtrs_clt
*clt
)
2807 return (int)clt
->max_reconnect_attempts
;
2811 * rtrs_clt_request() - Request data transfer to/from server via RDMA.
2814 * @ops: callback function to be called as confirmation, and the pointer.
2816 * @permit: Preallocated permit
2817 * @vec: Message that is sent to server together with the request.
2818 * Sum of len of all @vec elements limited to <= IO_MSG_SIZE.
2819 * Since the msg is copied internally it can be allocated on stack.
2820 * @nr: Number of elements in @vec.
2821 * @data_len: length of data sent to/from server
2822 * @sg: Pages to be sent/received to/from server.
2823 * @sg_cnt: Number of elements in the @sg
2829 * On dir=READ rtrs client will request a data transfer from Server to client.
2830 * The data that the server will respond with will be stored in @sg when
2831 * the user receives an %RTRS_CLT_RDMA_EV_RDMA_REQUEST_WRITE_COMPL event.
2832 * On dir=WRITE rtrs client will rdma write data in sg to server side.
2834 int rtrs_clt_request(int dir
, struct rtrs_clt_req_ops
*ops
,
2835 struct rtrs_clt
*clt
, struct rtrs_permit
*permit
,
2836 const struct kvec
*vec
, size_t nr
, size_t data_len
,
2837 struct scatterlist
*sg
, unsigned int sg_cnt
)
2839 struct rtrs_clt_io_req
*req
;
2840 struct rtrs_clt_sess
*sess
;
2842 enum dma_data_direction dma_dir
;
2843 int err
= -ECONNABORTED
, i
;
2844 size_t usr_len
, hdr_len
;
2847 /* Get kvec length */
2848 for (i
= 0, usr_len
= 0; i
< nr
; i
++)
2849 usr_len
+= vec
[i
].iov_len
;
2852 hdr_len
= sizeof(struct rtrs_msg_rdma_read
) +
2853 sg_cnt
* sizeof(struct rtrs_sg_desc
);
2854 dma_dir
= DMA_FROM_DEVICE
;
2856 hdr_len
= sizeof(struct rtrs_msg_rdma_write
);
2857 dma_dir
= DMA_TO_DEVICE
;
2861 for (path_it_init(&it
, clt
);
2862 (sess
= it
.next_path(&it
)) && it
.i
< it
.clt
->paths_num
; it
.i
++) {
2863 if (unlikely(READ_ONCE(sess
->state
) != RTRS_CLT_CONNECTED
))
2866 if (unlikely(usr_len
+ hdr_len
> sess
->max_hdr_size
)) {
2867 rtrs_wrn_rl(sess
->clt
,
2868 "%s request failed, user message size is %zu and header length %zu, but max size is %u\n",
2869 dir
== READ
? "Read" : "Write",
2870 usr_len
, hdr_len
, sess
->max_hdr_size
);
2874 req
= rtrs_clt_get_req(sess
, ops
->conf_fn
, permit
, ops
->priv
,
2875 vec
, usr_len
, sg
, sg_cnt
, data_len
,
2878 err
= rtrs_clt_read_req(req
);
2880 err
= rtrs_clt_write_req(req
);
2881 if (unlikely(err
)) {
2882 req
->in_use
= false;
2888 path_it_deinit(&it
);
2893 EXPORT_SYMBOL(rtrs_clt_request
);
2896 * rtrs_clt_query() - queries RTRS session attributes
2897 *@clt: session pointer
2898 *@attr: query results for session attributes.
2901 * -ECOMM no connection to the server
2903 int rtrs_clt_query(struct rtrs_clt
*clt
, struct rtrs_attrs
*attr
)
2905 if (!rtrs_clt_is_connected(clt
))
2908 attr
->queue_depth
= clt
->queue_depth
;
2909 attr
->max_io_size
= clt
->max_io_size
;
2910 attr
->sess_kobj
= &clt
->dev
.kobj
;
2911 strlcpy(attr
->sessname
, clt
->sessname
, sizeof(attr
->sessname
));
2915 EXPORT_SYMBOL(rtrs_clt_query
);
2917 int rtrs_clt_create_path_from_sysfs(struct rtrs_clt
*clt
,
2918 struct rtrs_addr
*addr
)
2920 struct rtrs_clt_sess
*sess
;
2923 sess
= alloc_sess(clt
, addr
, nr_cpu_ids
, clt
->max_segments
,
2924 clt
->max_segment_size
);
2926 return PTR_ERR(sess
);
2929 * It is totally safe to add path in CONNECTING state: coming
2930 * IO will never grab it. Also it is very important to add
2931 * path before init, since init fires LINK_CONNECTED event.
2933 rtrs_clt_add_path_to_arr(sess
);
2935 err
= init_sess(sess
);
2939 err
= rtrs_clt_create_sess_files(sess
);
2946 rtrs_clt_remove_path_from_arr(sess
);
2947 rtrs_clt_close_conns(sess
, true);
2953 static int rtrs_clt_ib_dev_init(struct rtrs_ib_dev
*dev
)
2955 if (!(dev
->ib_dev
->attrs
.device_cap_flags
&
2956 IB_DEVICE_MEM_MGT_EXTENSIONS
)) {
2957 pr_err("Memory registrations not supported.\n");
2964 static const struct rtrs_rdma_dev_pd_ops dev_pd_ops
= {
2965 .init
= rtrs_clt_ib_dev_init
2968 static int __init
rtrs_client_init(void)
2970 rtrs_rdma_dev_pd_init(0, &dev_pd
);
2972 rtrs_clt_dev_class
= class_create(THIS_MODULE
, "rtrs-client");
2973 if (IS_ERR(rtrs_clt_dev_class
)) {
2974 pr_err("Failed to create rtrs-client dev class\n");
2975 return PTR_ERR(rtrs_clt_dev_class
);
2977 rtrs_wq
= alloc_workqueue("rtrs_client_wq", 0, 0);
2979 class_destroy(rtrs_clt_dev_class
);
2986 static void __exit
rtrs_client_exit(void)
2988 destroy_workqueue(rtrs_wq
);
2989 class_destroy(rtrs_clt_dev_class
);
2990 rtrs_rdma_dev_pd_deinit(&dev_pd
);
2993 module_init(rtrs_client_init
);
2994 module_exit(rtrs_client_exit
);