Merge tag 'block-5.11-2021-01-10' of git://git.kernel.dk/linux-block
[linux/fpc-iii.git] / drivers / infiniband / ulp / rtrs / rtrs-clt.c
blob67f86c405a265a623f79d761a5b4b806d5bb45cc
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3 * RDMA Transport Layer
5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
8 */
10 #undef pr_fmt
11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
13 #include <linux/module.h>
14 #include <linux/rculist.h>
15 #include <linux/random.h>
17 #include "rtrs-clt.h"
18 #include "rtrs-log.h"
20 #define RTRS_CONNECT_TIMEOUT_MS 30000
22 * Wait a bit before trying to reconnect after a failure
23 * in order to give server time to finish clean up which
24 * leads to "false positives" failed reconnect attempts
26 #define RTRS_RECONNECT_BACKOFF 1000
28 * Wait for additional random time between 0 and 8 seconds
29 * before starting to reconnect to avoid clients reconnecting
30 * all at once in case of a major network outage
32 #define RTRS_RECONNECT_SEED 8
34 MODULE_DESCRIPTION("RDMA Transport Client");
35 MODULE_LICENSE("GPL");
37 static const struct rtrs_rdma_dev_pd_ops dev_pd_ops;
38 static struct rtrs_rdma_dev_pd dev_pd = {
39 .ops = &dev_pd_ops
42 static struct workqueue_struct *rtrs_wq;
43 static struct class *rtrs_clt_dev_class;
45 static inline bool rtrs_clt_is_connected(const struct rtrs_clt *clt)
47 struct rtrs_clt_sess *sess;
48 bool connected = false;
50 rcu_read_lock();
51 list_for_each_entry_rcu(sess, &clt->paths_list, s.entry)
52 connected |= READ_ONCE(sess->state) == RTRS_CLT_CONNECTED;
53 rcu_read_unlock();
55 return connected;
58 static struct rtrs_permit *
59 __rtrs_get_permit(struct rtrs_clt *clt, enum rtrs_clt_con_type con_type)
61 size_t max_depth = clt->queue_depth;
62 struct rtrs_permit *permit;
63 int bit;
66 * Adapted from null_blk get_tag(). Callers from different cpus may
67 * grab the same bit, since find_first_zero_bit is not atomic.
68 * But then the test_and_set_bit_lock will fail for all the
69 * callers but one, so that they will loop again.
70 * This way an explicit spinlock is not required.
72 do {
73 bit = find_first_zero_bit(clt->permits_map, max_depth);
74 if (unlikely(bit >= max_depth))
75 return NULL;
76 } while (unlikely(test_and_set_bit_lock(bit, clt->permits_map)));
78 permit = get_permit(clt, bit);
79 WARN_ON(permit->mem_id != bit);
80 permit->cpu_id = raw_smp_processor_id();
81 permit->con_type = con_type;
83 return permit;
86 static inline void __rtrs_put_permit(struct rtrs_clt *clt,
87 struct rtrs_permit *permit)
89 clear_bit_unlock(permit->mem_id, clt->permits_map);
92 /**
93 * rtrs_clt_get_permit() - allocates permit for future RDMA operation
94 * @clt: Current session
95 * @con_type: Type of connection to use with the permit
96 * @can_wait: Wait type
98 * Description:
99 * Allocates permit for the following RDMA operation. Permit is used
100 * to preallocate all resources and to propagate memory pressure
101 * up earlier.
103 * Context:
104 * Can sleep if @wait == RTRS_TAG_WAIT
106 struct rtrs_permit *rtrs_clt_get_permit(struct rtrs_clt *clt,
107 enum rtrs_clt_con_type con_type,
108 int can_wait)
110 struct rtrs_permit *permit;
111 DEFINE_WAIT(wait);
113 permit = __rtrs_get_permit(clt, con_type);
114 if (likely(permit) || !can_wait)
115 return permit;
117 do {
118 prepare_to_wait(&clt->permits_wait, &wait,
119 TASK_UNINTERRUPTIBLE);
120 permit = __rtrs_get_permit(clt, con_type);
121 if (likely(permit))
122 break;
124 io_schedule();
125 } while (1);
127 finish_wait(&clt->permits_wait, &wait);
129 return permit;
131 EXPORT_SYMBOL(rtrs_clt_get_permit);
134 * rtrs_clt_put_permit() - puts allocated permit
135 * @clt: Current session
136 * @permit: Permit to be freed
138 * Context:
139 * Does not matter
141 void rtrs_clt_put_permit(struct rtrs_clt *clt, struct rtrs_permit *permit)
143 if (WARN_ON(!test_bit(permit->mem_id, clt->permits_map)))
144 return;
146 __rtrs_put_permit(clt, permit);
149 * rtrs_clt_get_permit() adds itself to the &clt->permits_wait list
150 * before calling schedule(). So if rtrs_clt_get_permit() is sleeping
151 * it must have added itself to &clt->permits_wait before
152 * __rtrs_put_permit() finished.
153 * Hence it is safe to guard wake_up() with a waitqueue_active() test.
155 if (waitqueue_active(&clt->permits_wait))
156 wake_up(&clt->permits_wait);
158 EXPORT_SYMBOL(rtrs_clt_put_permit);
161 * rtrs_permit_to_clt_con() - returns RDMA connection pointer by the permit
162 * @sess: client session pointer
163 * @permit: permit for the allocation of the RDMA buffer
164 * Note:
165 * IO connection starts from 1.
166 * 0 connection is for user messages.
168 static
169 struct rtrs_clt_con *rtrs_permit_to_clt_con(struct rtrs_clt_sess *sess,
170 struct rtrs_permit *permit)
172 int id = 0;
174 if (likely(permit->con_type == RTRS_IO_CON))
175 id = (permit->cpu_id % (sess->s.con_num - 1)) + 1;
177 return to_clt_con(sess->s.con[id]);
181 * __rtrs_clt_change_state() - change the session state through session state
182 * machine.
184 * @sess: client session to change the state of.
185 * @new_state: state to change to.
187 * returns true if successful, false if the requested state can not be set.
189 * Locks:
190 * state_wq lock must be hold.
192 static bool __rtrs_clt_change_state(struct rtrs_clt_sess *sess,
193 enum rtrs_clt_state new_state)
195 enum rtrs_clt_state old_state;
196 bool changed = false;
198 lockdep_assert_held(&sess->state_wq.lock);
200 old_state = sess->state;
201 switch (new_state) {
202 case RTRS_CLT_CONNECTING:
203 switch (old_state) {
204 case RTRS_CLT_RECONNECTING:
205 changed = true;
206 fallthrough;
207 default:
208 break;
210 break;
211 case RTRS_CLT_RECONNECTING:
212 switch (old_state) {
213 case RTRS_CLT_CONNECTED:
214 case RTRS_CLT_CONNECTING_ERR:
215 case RTRS_CLT_CLOSED:
216 changed = true;
217 fallthrough;
218 default:
219 break;
221 break;
222 case RTRS_CLT_CONNECTED:
223 switch (old_state) {
224 case RTRS_CLT_CONNECTING:
225 changed = true;
226 fallthrough;
227 default:
228 break;
230 break;
231 case RTRS_CLT_CONNECTING_ERR:
232 switch (old_state) {
233 case RTRS_CLT_CONNECTING:
234 changed = true;
235 fallthrough;
236 default:
237 break;
239 break;
240 case RTRS_CLT_CLOSING:
241 switch (old_state) {
242 case RTRS_CLT_CONNECTING:
243 case RTRS_CLT_CONNECTING_ERR:
244 case RTRS_CLT_RECONNECTING:
245 case RTRS_CLT_CONNECTED:
246 changed = true;
247 fallthrough;
248 default:
249 break;
251 break;
252 case RTRS_CLT_CLOSED:
253 switch (old_state) {
254 case RTRS_CLT_CLOSING:
255 changed = true;
256 fallthrough;
257 default:
258 break;
260 break;
261 case RTRS_CLT_DEAD:
262 switch (old_state) {
263 case RTRS_CLT_CLOSED:
264 changed = true;
265 fallthrough;
266 default:
267 break;
269 break;
270 default:
271 break;
273 if (changed) {
274 sess->state = new_state;
275 wake_up_locked(&sess->state_wq);
278 return changed;
281 static bool rtrs_clt_change_state_from_to(struct rtrs_clt_sess *sess,
282 enum rtrs_clt_state old_state,
283 enum rtrs_clt_state new_state)
285 bool changed = false;
287 spin_lock_irq(&sess->state_wq.lock);
288 if (sess->state == old_state)
289 changed = __rtrs_clt_change_state(sess, new_state);
290 spin_unlock_irq(&sess->state_wq.lock);
292 return changed;
295 static void rtrs_rdma_error_recovery(struct rtrs_clt_con *con)
297 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
299 if (rtrs_clt_change_state_from_to(sess,
300 RTRS_CLT_CONNECTED,
301 RTRS_CLT_RECONNECTING)) {
302 struct rtrs_clt *clt = sess->clt;
303 unsigned int delay_ms;
306 * Normal scenario, reconnect if we were successfully connected
308 delay_ms = clt->reconnect_delay_sec * 1000;
309 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork,
310 msecs_to_jiffies(delay_ms +
311 prandom_u32() % RTRS_RECONNECT_SEED));
312 } else {
314 * Error can happen just on establishing new connection,
315 * so notify waiter with error state, waiter is responsible
316 * for cleaning the rest and reconnect if needed.
318 rtrs_clt_change_state_from_to(sess,
319 RTRS_CLT_CONNECTING,
320 RTRS_CLT_CONNECTING_ERR);
324 static void rtrs_clt_fast_reg_done(struct ib_cq *cq, struct ib_wc *wc)
326 struct rtrs_clt_con *con = cq->cq_context;
328 if (unlikely(wc->status != IB_WC_SUCCESS)) {
329 rtrs_err(con->c.sess, "Failed IB_WR_REG_MR: %s\n",
330 ib_wc_status_msg(wc->status));
331 rtrs_rdma_error_recovery(con);
335 static struct ib_cqe fast_reg_cqe = {
336 .done = rtrs_clt_fast_reg_done
339 static void complete_rdma_req(struct rtrs_clt_io_req *req, int errno,
340 bool notify, bool can_wait);
342 static void rtrs_clt_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc)
344 struct rtrs_clt_io_req *req =
345 container_of(wc->wr_cqe, typeof(*req), inv_cqe);
346 struct rtrs_clt_con *con = cq->cq_context;
348 if (unlikely(wc->status != IB_WC_SUCCESS)) {
349 rtrs_err(con->c.sess, "Failed IB_WR_LOCAL_INV: %s\n",
350 ib_wc_status_msg(wc->status));
351 rtrs_rdma_error_recovery(con);
353 req->need_inv = false;
354 if (likely(req->need_inv_comp))
355 complete(&req->inv_comp);
356 else
357 /* Complete request from INV callback */
358 complete_rdma_req(req, req->inv_errno, true, false);
361 static int rtrs_inv_rkey(struct rtrs_clt_io_req *req)
363 struct rtrs_clt_con *con = req->con;
364 struct ib_send_wr wr = {
365 .opcode = IB_WR_LOCAL_INV,
366 .wr_cqe = &req->inv_cqe,
367 .send_flags = IB_SEND_SIGNALED,
368 .ex.invalidate_rkey = req->mr->rkey,
370 req->inv_cqe.done = rtrs_clt_inv_rkey_done;
372 return ib_post_send(con->c.qp, &wr, NULL);
375 static void complete_rdma_req(struct rtrs_clt_io_req *req, int errno,
376 bool notify, bool can_wait)
378 struct rtrs_clt_con *con = req->con;
379 struct rtrs_clt_sess *sess;
380 int err;
382 if (WARN_ON(!req->in_use))
383 return;
384 if (WARN_ON(!req->con))
385 return;
386 sess = to_clt_sess(con->c.sess);
388 if (req->sg_cnt) {
389 if (unlikely(req->dir == DMA_FROM_DEVICE && req->need_inv)) {
391 * We are here to invalidate read requests
392 * ourselves. In normal scenario server should
393 * send INV for all read requests, but
394 * we are here, thus two things could happen:
396 * 1. this is failover, when errno != 0
397 * and can_wait == 1,
399 * 2. something totally bad happened and
400 * server forgot to send INV, so we
401 * should do that ourselves.
404 if (likely(can_wait)) {
405 req->need_inv_comp = true;
406 } else {
407 /* This should be IO path, so always notify */
408 WARN_ON(!notify);
409 /* Save errno for INV callback */
410 req->inv_errno = errno;
413 err = rtrs_inv_rkey(req);
414 if (unlikely(err)) {
415 rtrs_err(con->c.sess, "Send INV WR key=%#x: %d\n",
416 req->mr->rkey, err);
417 } else if (likely(can_wait)) {
418 wait_for_completion(&req->inv_comp);
419 } else {
421 * Something went wrong, so request will be
422 * completed from INV callback.
424 WARN_ON_ONCE(1);
426 return;
429 ib_dma_unmap_sg(sess->s.dev->ib_dev, req->sglist,
430 req->sg_cnt, req->dir);
432 if (sess->clt->mp_policy == MP_POLICY_MIN_INFLIGHT)
433 atomic_dec(&sess->stats->inflight);
435 req->in_use = false;
436 req->con = NULL;
438 if (notify)
439 req->conf(req->priv, errno);
442 static int rtrs_post_send_rdma(struct rtrs_clt_con *con,
443 struct rtrs_clt_io_req *req,
444 struct rtrs_rbuf *rbuf, u32 off,
445 u32 imm, struct ib_send_wr *wr)
447 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
448 enum ib_send_flags flags;
449 struct ib_sge sge;
451 if (unlikely(!req->sg_size)) {
452 rtrs_wrn(con->c.sess,
453 "Doing RDMA Write failed, no data supplied\n");
454 return -EINVAL;
457 /* user data and user message in the first list element */
458 sge.addr = req->iu->dma_addr;
459 sge.length = req->sg_size;
460 sge.lkey = sess->s.dev->ib_pd->local_dma_lkey;
463 * From time to time we have to post signalled sends,
464 * or send queue will fill up and only QP reset can help.
466 flags = atomic_inc_return(&con->io_cnt) % sess->queue_depth ?
467 0 : IB_SEND_SIGNALED;
469 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, req->iu->dma_addr,
470 req->sg_size, DMA_TO_DEVICE);
472 return rtrs_iu_post_rdma_write_imm(&con->c, req->iu, &sge, 1,
473 rbuf->rkey, rbuf->addr + off,
474 imm, flags, wr);
477 static void process_io_rsp(struct rtrs_clt_sess *sess, u32 msg_id,
478 s16 errno, bool w_inval)
480 struct rtrs_clt_io_req *req;
482 if (WARN_ON(msg_id >= sess->queue_depth))
483 return;
485 req = &sess->reqs[msg_id];
486 /* Drop need_inv if server responded with send with invalidation */
487 req->need_inv &= !w_inval;
488 complete_rdma_req(req, errno, true, false);
491 static void rtrs_clt_recv_done(struct rtrs_clt_con *con, struct ib_wc *wc)
493 struct rtrs_iu *iu;
494 int err;
495 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
497 WARN_ON(sess->flags != RTRS_MSG_NEW_RKEY_F);
498 iu = container_of(wc->wr_cqe, struct rtrs_iu,
499 cqe);
500 err = rtrs_iu_post_recv(&con->c, iu);
501 if (unlikely(err)) {
502 rtrs_err(con->c.sess, "post iu failed %d\n", err);
503 rtrs_rdma_error_recovery(con);
507 static void rtrs_clt_rkey_rsp_done(struct rtrs_clt_con *con, struct ib_wc *wc)
509 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
510 struct rtrs_msg_rkey_rsp *msg;
511 u32 imm_type, imm_payload;
512 bool w_inval = false;
513 struct rtrs_iu *iu;
514 u32 buf_id;
515 int err;
517 WARN_ON(sess->flags != RTRS_MSG_NEW_RKEY_F);
519 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
521 if (unlikely(wc->byte_len < sizeof(*msg))) {
522 rtrs_err(con->c.sess, "rkey response is malformed: size %d\n",
523 wc->byte_len);
524 goto out;
526 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr,
527 iu->size, DMA_FROM_DEVICE);
528 msg = iu->buf;
529 if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_RKEY_RSP)) {
530 rtrs_err(sess->clt, "rkey response is malformed: type %d\n",
531 le16_to_cpu(msg->type));
532 goto out;
534 buf_id = le16_to_cpu(msg->buf_id);
535 if (WARN_ON(buf_id >= sess->queue_depth))
536 goto out;
538 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), &imm_type, &imm_payload);
539 if (likely(imm_type == RTRS_IO_RSP_IMM ||
540 imm_type == RTRS_IO_RSP_W_INV_IMM)) {
541 u32 msg_id;
543 w_inval = (imm_type == RTRS_IO_RSP_W_INV_IMM);
544 rtrs_from_io_rsp_imm(imm_payload, &msg_id, &err);
546 if (WARN_ON(buf_id != msg_id))
547 goto out;
548 sess->rbufs[buf_id].rkey = le32_to_cpu(msg->rkey);
549 process_io_rsp(sess, msg_id, err, w_inval);
551 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, iu->dma_addr,
552 iu->size, DMA_FROM_DEVICE);
553 return rtrs_clt_recv_done(con, wc);
554 out:
555 rtrs_rdma_error_recovery(con);
558 static void rtrs_clt_rdma_done(struct ib_cq *cq, struct ib_wc *wc);
560 static struct ib_cqe io_comp_cqe = {
561 .done = rtrs_clt_rdma_done
565 * Post x2 empty WRs: first is for this RDMA with IMM,
566 * second is for RECV with INV, which happened earlier.
568 static int rtrs_post_recv_empty_x2(struct rtrs_con *con, struct ib_cqe *cqe)
570 struct ib_recv_wr wr_arr[2], *wr;
571 int i;
573 memset(wr_arr, 0, sizeof(wr_arr));
574 for (i = 0; i < ARRAY_SIZE(wr_arr); i++) {
575 wr = &wr_arr[i];
576 wr->wr_cqe = cqe;
577 if (i)
578 /* Chain backwards */
579 wr->next = &wr_arr[i - 1];
582 return ib_post_recv(con->qp, wr, NULL);
585 static void rtrs_clt_rdma_done(struct ib_cq *cq, struct ib_wc *wc)
587 struct rtrs_clt_con *con = cq->cq_context;
588 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
589 u32 imm_type, imm_payload;
590 bool w_inval = false;
591 int err;
593 if (unlikely(wc->status != IB_WC_SUCCESS)) {
594 if (wc->status != IB_WC_WR_FLUSH_ERR) {
595 rtrs_err(sess->clt, "RDMA failed: %s\n",
596 ib_wc_status_msg(wc->status));
597 rtrs_rdma_error_recovery(con);
599 return;
601 rtrs_clt_update_wc_stats(con);
603 switch (wc->opcode) {
604 case IB_WC_RECV_RDMA_WITH_IMM:
606 * post_recv() RDMA write completions of IO reqs (read/write)
607 * and hb
609 if (WARN_ON(wc->wr_cqe->done != rtrs_clt_rdma_done))
610 return;
611 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data),
612 &imm_type, &imm_payload);
613 if (likely(imm_type == RTRS_IO_RSP_IMM ||
614 imm_type == RTRS_IO_RSP_W_INV_IMM)) {
615 u32 msg_id;
617 w_inval = (imm_type == RTRS_IO_RSP_W_INV_IMM);
618 rtrs_from_io_rsp_imm(imm_payload, &msg_id, &err);
620 process_io_rsp(sess, msg_id, err, w_inval);
621 } else if (imm_type == RTRS_HB_MSG_IMM) {
622 WARN_ON(con->c.cid);
623 rtrs_send_hb_ack(&sess->s);
624 if (sess->flags == RTRS_MSG_NEW_RKEY_F)
625 return rtrs_clt_recv_done(con, wc);
626 } else if (imm_type == RTRS_HB_ACK_IMM) {
627 WARN_ON(con->c.cid);
628 sess->s.hb_missed_cnt = 0;
629 if (sess->flags == RTRS_MSG_NEW_RKEY_F)
630 return rtrs_clt_recv_done(con, wc);
631 } else {
632 rtrs_wrn(con->c.sess, "Unknown IMM type %u\n",
633 imm_type);
635 if (w_inval)
637 * Post x2 empty WRs: first is for this RDMA with IMM,
638 * second is for RECV with INV, which happened earlier.
640 err = rtrs_post_recv_empty_x2(&con->c, &io_comp_cqe);
641 else
642 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
643 if (unlikely(err)) {
644 rtrs_err(con->c.sess, "rtrs_post_recv_empty(): %d\n",
645 err);
646 rtrs_rdma_error_recovery(con);
647 break;
649 break;
650 case IB_WC_RECV:
652 * Key invalidations from server side
654 WARN_ON(!(wc->wc_flags & IB_WC_WITH_INVALIDATE ||
655 wc->wc_flags & IB_WC_WITH_IMM));
656 WARN_ON(wc->wr_cqe->done != rtrs_clt_rdma_done);
657 if (sess->flags == RTRS_MSG_NEW_RKEY_F) {
658 if (wc->wc_flags & IB_WC_WITH_INVALIDATE)
659 return rtrs_clt_recv_done(con, wc);
661 return rtrs_clt_rkey_rsp_done(con, wc);
663 break;
664 case IB_WC_RDMA_WRITE:
666 * post_send() RDMA write completions of IO reqs (read/write)
667 * and hb
669 break;
671 default:
672 rtrs_wrn(sess->clt, "Unexpected WC type: %d\n", wc->opcode);
673 return;
677 static int post_recv_io(struct rtrs_clt_con *con, size_t q_size)
679 int err, i;
680 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
682 for (i = 0; i < q_size; i++) {
683 if (sess->flags == RTRS_MSG_NEW_RKEY_F) {
684 struct rtrs_iu *iu = &con->rsp_ius[i];
686 err = rtrs_iu_post_recv(&con->c, iu);
687 } else {
688 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
690 if (unlikely(err))
691 return err;
694 return 0;
697 static int post_recv_sess(struct rtrs_clt_sess *sess)
699 size_t q_size = 0;
700 int err, cid;
702 for (cid = 0; cid < sess->s.con_num; cid++) {
703 if (cid == 0)
704 q_size = SERVICE_CON_QUEUE_DEPTH;
705 else
706 q_size = sess->queue_depth;
709 * x2 for RDMA read responses + FR key invalidations,
710 * RDMA writes do not require any FR registrations.
712 q_size *= 2;
714 err = post_recv_io(to_clt_con(sess->s.con[cid]), q_size);
715 if (unlikely(err)) {
716 rtrs_err(sess->clt, "post_recv_io(), err: %d\n", err);
717 return err;
721 return 0;
724 struct path_it {
725 int i;
726 struct list_head skip_list;
727 struct rtrs_clt *clt;
728 struct rtrs_clt_sess *(*next_path)(struct path_it *it);
732 * list_next_or_null_rr_rcu - get next list element in round-robin fashion.
733 * @head: the head for the list.
734 * @ptr: the list head to take the next element from.
735 * @type: the type of the struct this is embedded in.
736 * @memb: the name of the list_head within the struct.
738 * Next element returned in round-robin fashion, i.e. head will be skipped,
739 * but if list is observed as empty, NULL will be returned.
741 * This primitive may safely run concurrently with the _rcu list-mutation
742 * primitives such as list_add_rcu() as long as it's guarded by rcu_read_lock().
744 #define list_next_or_null_rr_rcu(head, ptr, type, memb) \
745 ({ \
746 list_next_or_null_rcu(head, ptr, type, memb) ?: \
747 list_next_or_null_rcu(head, READ_ONCE((ptr)->next), \
748 type, memb); \
752 * get_next_path_rr() - Returns path in round-robin fashion.
753 * @it: the path pointer
755 * Related to @MP_POLICY_RR
757 * Locks:
758 * rcu_read_lock() must be hold.
760 static struct rtrs_clt_sess *get_next_path_rr(struct path_it *it)
762 struct rtrs_clt_sess __rcu **ppcpu_path;
763 struct rtrs_clt_sess *path;
764 struct rtrs_clt *clt;
766 clt = it->clt;
769 * Here we use two RCU objects: @paths_list and @pcpu_path
770 * pointer. See rtrs_clt_remove_path_from_arr() for details
771 * how that is handled.
774 ppcpu_path = this_cpu_ptr(clt->pcpu_path);
775 path = rcu_dereference(*ppcpu_path);
776 if (unlikely(!path))
777 path = list_first_or_null_rcu(&clt->paths_list,
778 typeof(*path), s.entry);
779 else
780 path = list_next_or_null_rr_rcu(&clt->paths_list,
781 &path->s.entry,
782 typeof(*path),
783 s.entry);
784 rcu_assign_pointer(*ppcpu_path, path);
786 return path;
790 * get_next_path_min_inflight() - Returns path with minimal inflight count.
791 * @it: the path pointer
793 * Related to @MP_POLICY_MIN_INFLIGHT
795 * Locks:
796 * rcu_read_lock() must be hold.
798 static struct rtrs_clt_sess *get_next_path_min_inflight(struct path_it *it)
800 struct rtrs_clt_sess *min_path = NULL;
801 struct rtrs_clt *clt = it->clt;
802 struct rtrs_clt_sess *sess;
803 int min_inflight = INT_MAX;
804 int inflight;
806 list_for_each_entry_rcu(sess, &clt->paths_list, s.entry) {
807 if (unlikely(!list_empty(raw_cpu_ptr(sess->mp_skip_entry))))
808 continue;
810 inflight = atomic_read(&sess->stats->inflight);
812 if (inflight < min_inflight) {
813 min_inflight = inflight;
814 min_path = sess;
819 * add the path to the skip list, so that next time we can get
820 * a different one
822 if (min_path)
823 list_add(raw_cpu_ptr(min_path->mp_skip_entry), &it->skip_list);
825 return min_path;
828 static inline void path_it_init(struct path_it *it, struct rtrs_clt *clt)
830 INIT_LIST_HEAD(&it->skip_list);
831 it->clt = clt;
832 it->i = 0;
834 if (clt->mp_policy == MP_POLICY_RR)
835 it->next_path = get_next_path_rr;
836 else
837 it->next_path = get_next_path_min_inflight;
840 static inline void path_it_deinit(struct path_it *it)
842 struct list_head *skip, *tmp;
844 * The skip_list is used only for the MIN_INFLIGHT policy.
845 * We need to remove paths from it, so that next IO can insert
846 * paths (->mp_skip_entry) into a skip_list again.
848 list_for_each_safe(skip, tmp, &it->skip_list)
849 list_del_init(skip);
853 * rtrs_clt_init_req() Initialize an rtrs_clt_io_req holding information
854 * about an inflight IO.
855 * The user buffer holding user control message (not data) is copied into
856 * the corresponding buffer of rtrs_iu (req->iu->buf), which later on will
857 * also hold the control message of rtrs.
858 * @req: an io request holding information about IO.
859 * @sess: client session
860 * @conf: conformation callback function to notify upper layer.
861 * @permit: permit for allocation of RDMA remote buffer
862 * @priv: private pointer
863 * @vec: kernel vector containing control message
864 * @usr_len: length of the user message
865 * @sg: scater list for IO data
866 * @sg_cnt: number of scater list entries
867 * @data_len: length of the IO data
868 * @dir: direction of the IO.
870 static void rtrs_clt_init_req(struct rtrs_clt_io_req *req,
871 struct rtrs_clt_sess *sess,
872 void (*conf)(void *priv, int errno),
873 struct rtrs_permit *permit, void *priv,
874 const struct kvec *vec, size_t usr_len,
875 struct scatterlist *sg, size_t sg_cnt,
876 size_t data_len, int dir)
878 struct iov_iter iter;
879 size_t len;
881 req->permit = permit;
882 req->in_use = true;
883 req->usr_len = usr_len;
884 req->data_len = data_len;
885 req->sglist = sg;
886 req->sg_cnt = sg_cnt;
887 req->priv = priv;
888 req->dir = dir;
889 req->con = rtrs_permit_to_clt_con(sess, permit);
890 req->conf = conf;
891 req->need_inv = false;
892 req->need_inv_comp = false;
893 req->inv_errno = 0;
895 iov_iter_kvec(&iter, READ, vec, 1, usr_len);
896 len = _copy_from_iter(req->iu->buf, usr_len, &iter);
897 WARN_ON(len != usr_len);
899 reinit_completion(&req->inv_comp);
902 static struct rtrs_clt_io_req *
903 rtrs_clt_get_req(struct rtrs_clt_sess *sess,
904 void (*conf)(void *priv, int errno),
905 struct rtrs_permit *permit, void *priv,
906 const struct kvec *vec, size_t usr_len,
907 struct scatterlist *sg, size_t sg_cnt,
908 size_t data_len, int dir)
910 struct rtrs_clt_io_req *req;
912 req = &sess->reqs[permit->mem_id];
913 rtrs_clt_init_req(req, sess, conf, permit, priv, vec, usr_len,
914 sg, sg_cnt, data_len, dir);
915 return req;
918 static struct rtrs_clt_io_req *
919 rtrs_clt_get_copy_req(struct rtrs_clt_sess *alive_sess,
920 struct rtrs_clt_io_req *fail_req)
922 struct rtrs_clt_io_req *req;
923 struct kvec vec = {
924 .iov_base = fail_req->iu->buf,
925 .iov_len = fail_req->usr_len
928 req = &alive_sess->reqs[fail_req->permit->mem_id];
929 rtrs_clt_init_req(req, alive_sess, fail_req->conf, fail_req->permit,
930 fail_req->priv, &vec, fail_req->usr_len,
931 fail_req->sglist, fail_req->sg_cnt,
932 fail_req->data_len, fail_req->dir);
933 return req;
936 static int rtrs_post_rdma_write_sg(struct rtrs_clt_con *con,
937 struct rtrs_clt_io_req *req,
938 struct rtrs_rbuf *rbuf,
939 u32 size, u32 imm)
941 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
942 struct ib_sge *sge = req->sge;
943 enum ib_send_flags flags;
944 struct scatterlist *sg;
945 size_t num_sge;
946 int i;
948 for_each_sg(req->sglist, sg, req->sg_cnt, i) {
949 sge[i].addr = sg_dma_address(sg);
950 sge[i].length = sg_dma_len(sg);
951 sge[i].lkey = sess->s.dev->ib_pd->local_dma_lkey;
953 sge[i].addr = req->iu->dma_addr;
954 sge[i].length = size;
955 sge[i].lkey = sess->s.dev->ib_pd->local_dma_lkey;
957 num_sge = 1 + req->sg_cnt;
960 * From time to time we have to post signalled sends,
961 * or send queue will fill up and only QP reset can help.
963 flags = atomic_inc_return(&con->io_cnt) % sess->queue_depth ?
964 0 : IB_SEND_SIGNALED;
966 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, req->iu->dma_addr,
967 size, DMA_TO_DEVICE);
969 return rtrs_iu_post_rdma_write_imm(&con->c, req->iu, sge, num_sge,
970 rbuf->rkey, rbuf->addr, imm,
971 flags, NULL);
974 static int rtrs_clt_write_req(struct rtrs_clt_io_req *req)
976 struct rtrs_clt_con *con = req->con;
977 struct rtrs_sess *s = con->c.sess;
978 struct rtrs_clt_sess *sess = to_clt_sess(s);
979 struct rtrs_msg_rdma_write *msg;
981 struct rtrs_rbuf *rbuf;
982 int ret, count = 0;
983 u32 imm, buf_id;
985 const size_t tsize = sizeof(*msg) + req->data_len + req->usr_len;
987 if (unlikely(tsize > sess->chunk_size)) {
988 rtrs_wrn(s, "Write request failed, size too big %zu > %d\n",
989 tsize, sess->chunk_size);
990 return -EMSGSIZE;
992 if (req->sg_cnt) {
993 count = ib_dma_map_sg(sess->s.dev->ib_dev, req->sglist,
994 req->sg_cnt, req->dir);
995 if (unlikely(!count)) {
996 rtrs_wrn(s, "Write request failed, map failed\n");
997 return -EINVAL;
1000 /* put rtrs msg after sg and user message */
1001 msg = req->iu->buf + req->usr_len;
1002 msg->type = cpu_to_le16(RTRS_MSG_WRITE);
1003 msg->usr_len = cpu_to_le16(req->usr_len);
1005 /* rtrs message on server side will be after user data and message */
1006 imm = req->permit->mem_off + req->data_len + req->usr_len;
1007 imm = rtrs_to_io_req_imm(imm);
1008 buf_id = req->permit->mem_id;
1009 req->sg_size = tsize;
1010 rbuf = &sess->rbufs[buf_id];
1013 * Update stats now, after request is successfully sent it is not
1014 * safe anymore to touch it.
1016 rtrs_clt_update_all_stats(req, WRITE);
1018 ret = rtrs_post_rdma_write_sg(req->con, req, rbuf,
1019 req->usr_len + sizeof(*msg),
1020 imm);
1021 if (unlikely(ret)) {
1022 rtrs_err(s, "Write request failed: %d\n", ret);
1023 if (sess->clt->mp_policy == MP_POLICY_MIN_INFLIGHT)
1024 atomic_dec(&sess->stats->inflight);
1025 if (req->sg_cnt)
1026 ib_dma_unmap_sg(sess->s.dev->ib_dev, req->sglist,
1027 req->sg_cnt, req->dir);
1030 return ret;
1033 static int rtrs_map_sg_fr(struct rtrs_clt_io_req *req, size_t count)
1035 int nr;
1037 /* Align the MR to a 4K page size to match the block virt boundary */
1038 nr = ib_map_mr_sg(req->mr, req->sglist, count, NULL, SZ_4K);
1039 if (nr < 0)
1040 return nr;
1041 if (unlikely(nr < req->sg_cnt))
1042 return -EINVAL;
1043 ib_update_fast_reg_key(req->mr, ib_inc_rkey(req->mr->rkey));
1045 return nr;
1048 static int rtrs_clt_read_req(struct rtrs_clt_io_req *req)
1050 struct rtrs_clt_con *con = req->con;
1051 struct rtrs_sess *s = con->c.sess;
1052 struct rtrs_clt_sess *sess = to_clt_sess(s);
1053 struct rtrs_msg_rdma_read *msg;
1054 struct rtrs_ib_dev *dev;
1056 struct ib_reg_wr rwr;
1057 struct ib_send_wr *wr = NULL;
1059 int ret, count = 0;
1060 u32 imm, buf_id;
1062 const size_t tsize = sizeof(*msg) + req->data_len + req->usr_len;
1064 s = &sess->s;
1065 dev = sess->s.dev;
1067 if (unlikely(tsize > sess->chunk_size)) {
1068 rtrs_wrn(s,
1069 "Read request failed, message size is %zu, bigger than CHUNK_SIZE %d\n",
1070 tsize, sess->chunk_size);
1071 return -EMSGSIZE;
1074 if (req->sg_cnt) {
1075 count = ib_dma_map_sg(dev->ib_dev, req->sglist, req->sg_cnt,
1076 req->dir);
1077 if (unlikely(!count)) {
1078 rtrs_wrn(s,
1079 "Read request failed, dma map failed\n");
1080 return -EINVAL;
1083 /* put our message into req->buf after user message*/
1084 msg = req->iu->buf + req->usr_len;
1085 msg->type = cpu_to_le16(RTRS_MSG_READ);
1086 msg->usr_len = cpu_to_le16(req->usr_len);
1088 if (count) {
1089 ret = rtrs_map_sg_fr(req, count);
1090 if (ret < 0) {
1091 rtrs_err_rl(s,
1092 "Read request failed, failed to map fast reg. data, err: %d\n",
1093 ret);
1094 ib_dma_unmap_sg(dev->ib_dev, req->sglist, req->sg_cnt,
1095 req->dir);
1096 return ret;
1098 rwr = (struct ib_reg_wr) {
1099 .wr.opcode = IB_WR_REG_MR,
1100 .wr.wr_cqe = &fast_reg_cqe,
1101 .mr = req->mr,
1102 .key = req->mr->rkey,
1103 .access = (IB_ACCESS_LOCAL_WRITE |
1104 IB_ACCESS_REMOTE_WRITE),
1106 wr = &rwr.wr;
1108 msg->sg_cnt = cpu_to_le16(1);
1109 msg->flags = cpu_to_le16(RTRS_MSG_NEED_INVAL_F);
1111 msg->desc[0].addr = cpu_to_le64(req->mr->iova);
1112 msg->desc[0].key = cpu_to_le32(req->mr->rkey);
1113 msg->desc[0].len = cpu_to_le32(req->mr->length);
1115 /* Further invalidation is required */
1116 req->need_inv = !!RTRS_MSG_NEED_INVAL_F;
1118 } else {
1119 msg->sg_cnt = 0;
1120 msg->flags = 0;
1123 * rtrs message will be after the space reserved for disk data and
1124 * user message
1126 imm = req->permit->mem_off + req->data_len + req->usr_len;
1127 imm = rtrs_to_io_req_imm(imm);
1128 buf_id = req->permit->mem_id;
1130 req->sg_size = sizeof(*msg);
1131 req->sg_size += le16_to_cpu(msg->sg_cnt) * sizeof(struct rtrs_sg_desc);
1132 req->sg_size += req->usr_len;
1135 * Update stats now, after request is successfully sent it is not
1136 * safe anymore to touch it.
1138 rtrs_clt_update_all_stats(req, READ);
1140 ret = rtrs_post_send_rdma(req->con, req, &sess->rbufs[buf_id],
1141 req->data_len, imm, wr);
1142 if (unlikely(ret)) {
1143 rtrs_err(s, "Read request failed: %d\n", ret);
1144 if (sess->clt->mp_policy == MP_POLICY_MIN_INFLIGHT)
1145 atomic_dec(&sess->stats->inflight);
1146 req->need_inv = false;
1147 if (req->sg_cnt)
1148 ib_dma_unmap_sg(dev->ib_dev, req->sglist,
1149 req->sg_cnt, req->dir);
1152 return ret;
1156 * rtrs_clt_failover_req() Try to find an active path for a failed request
1157 * @clt: clt context
1158 * @fail_req: a failed io request.
1160 static int rtrs_clt_failover_req(struct rtrs_clt *clt,
1161 struct rtrs_clt_io_req *fail_req)
1163 struct rtrs_clt_sess *alive_sess;
1164 struct rtrs_clt_io_req *req;
1165 int err = -ECONNABORTED;
1166 struct path_it it;
1168 rcu_read_lock();
1169 for (path_it_init(&it, clt);
1170 (alive_sess = it.next_path(&it)) && it.i < it.clt->paths_num;
1171 it.i++) {
1172 if (unlikely(READ_ONCE(alive_sess->state) !=
1173 RTRS_CLT_CONNECTED))
1174 continue;
1175 req = rtrs_clt_get_copy_req(alive_sess, fail_req);
1176 if (req->dir == DMA_TO_DEVICE)
1177 err = rtrs_clt_write_req(req);
1178 else
1179 err = rtrs_clt_read_req(req);
1180 if (unlikely(err)) {
1181 req->in_use = false;
1182 continue;
1184 /* Success path */
1185 rtrs_clt_inc_failover_cnt(alive_sess->stats);
1186 break;
1188 path_it_deinit(&it);
1189 rcu_read_unlock();
1191 return err;
1194 static void fail_all_outstanding_reqs(struct rtrs_clt_sess *sess)
1196 struct rtrs_clt *clt = sess->clt;
1197 struct rtrs_clt_io_req *req;
1198 int i, err;
1200 if (!sess->reqs)
1201 return;
1202 for (i = 0; i < sess->queue_depth; ++i) {
1203 req = &sess->reqs[i];
1204 if (!req->in_use)
1205 continue;
1208 * Safely (without notification) complete failed request.
1209 * After completion this request is still useble and can
1210 * be failovered to another path.
1212 complete_rdma_req(req, -ECONNABORTED, false, true);
1214 err = rtrs_clt_failover_req(clt, req);
1215 if (unlikely(err))
1216 /* Failover failed, notify anyway */
1217 req->conf(req->priv, err);
1221 static void free_sess_reqs(struct rtrs_clt_sess *sess)
1223 struct rtrs_clt_io_req *req;
1224 int i;
1226 if (!sess->reqs)
1227 return;
1228 for (i = 0; i < sess->queue_depth; ++i) {
1229 req = &sess->reqs[i];
1230 if (req->mr)
1231 ib_dereg_mr(req->mr);
1232 kfree(req->sge);
1233 rtrs_iu_free(req->iu, sess->s.dev->ib_dev, 1);
1235 kfree(sess->reqs);
1236 sess->reqs = NULL;
1239 static int alloc_sess_reqs(struct rtrs_clt_sess *sess)
1241 struct rtrs_clt_io_req *req;
1242 struct rtrs_clt *clt = sess->clt;
1243 int i, err = -ENOMEM;
1245 sess->reqs = kcalloc(sess->queue_depth, sizeof(*sess->reqs),
1246 GFP_KERNEL);
1247 if (!sess->reqs)
1248 return -ENOMEM;
1250 for (i = 0; i < sess->queue_depth; ++i) {
1251 req = &sess->reqs[i];
1252 req->iu = rtrs_iu_alloc(1, sess->max_hdr_size, GFP_KERNEL,
1253 sess->s.dev->ib_dev,
1254 DMA_TO_DEVICE,
1255 rtrs_clt_rdma_done);
1256 if (!req->iu)
1257 goto out;
1259 req->sge = kmalloc_array(clt->max_segments + 1,
1260 sizeof(*req->sge), GFP_KERNEL);
1261 if (!req->sge)
1262 goto out;
1264 req->mr = ib_alloc_mr(sess->s.dev->ib_pd, IB_MR_TYPE_MEM_REG,
1265 sess->max_pages_per_mr);
1266 if (IS_ERR(req->mr)) {
1267 err = PTR_ERR(req->mr);
1268 req->mr = NULL;
1269 pr_err("Failed to alloc sess->max_pages_per_mr %d\n",
1270 sess->max_pages_per_mr);
1271 goto out;
1274 init_completion(&req->inv_comp);
1277 return 0;
1279 out:
1280 free_sess_reqs(sess);
1282 return err;
1285 static int alloc_permits(struct rtrs_clt *clt)
1287 unsigned int chunk_bits;
1288 int err, i;
1290 clt->permits_map = kcalloc(BITS_TO_LONGS(clt->queue_depth),
1291 sizeof(long), GFP_KERNEL);
1292 if (!clt->permits_map) {
1293 err = -ENOMEM;
1294 goto out_err;
1296 clt->permits = kcalloc(clt->queue_depth, permit_size(clt), GFP_KERNEL);
1297 if (!clt->permits) {
1298 err = -ENOMEM;
1299 goto err_map;
1301 chunk_bits = ilog2(clt->queue_depth - 1) + 1;
1302 for (i = 0; i < clt->queue_depth; i++) {
1303 struct rtrs_permit *permit;
1305 permit = get_permit(clt, i);
1306 permit->mem_id = i;
1307 permit->mem_off = i << (MAX_IMM_PAYL_BITS - chunk_bits);
1310 return 0;
1312 err_map:
1313 kfree(clt->permits_map);
1314 clt->permits_map = NULL;
1315 out_err:
1316 return err;
1319 static void free_permits(struct rtrs_clt *clt)
1321 kfree(clt->permits_map);
1322 clt->permits_map = NULL;
1323 kfree(clt->permits);
1324 clt->permits = NULL;
1327 static void query_fast_reg_mode(struct rtrs_clt_sess *sess)
1329 struct ib_device *ib_dev;
1330 u64 max_pages_per_mr;
1331 int mr_page_shift;
1333 ib_dev = sess->s.dev->ib_dev;
1336 * Use the smallest page size supported by the HCA, down to a
1337 * minimum of 4096 bytes. We're unlikely to build large sglists
1338 * out of smaller entries.
1340 mr_page_shift = max(12, ffs(ib_dev->attrs.page_size_cap) - 1);
1341 max_pages_per_mr = ib_dev->attrs.max_mr_size;
1342 do_div(max_pages_per_mr, (1ull << mr_page_shift));
1343 sess->max_pages_per_mr =
1344 min3(sess->max_pages_per_mr, (u32)max_pages_per_mr,
1345 ib_dev->attrs.max_fast_reg_page_list_len);
1346 sess->max_send_sge = ib_dev->attrs.max_send_sge;
1349 static bool rtrs_clt_change_state_get_old(struct rtrs_clt_sess *sess,
1350 enum rtrs_clt_state new_state,
1351 enum rtrs_clt_state *old_state)
1353 bool changed;
1355 spin_lock_irq(&sess->state_wq.lock);
1356 *old_state = sess->state;
1357 changed = __rtrs_clt_change_state(sess, new_state);
1358 spin_unlock_irq(&sess->state_wq.lock);
1360 return changed;
1363 static bool rtrs_clt_change_state(struct rtrs_clt_sess *sess,
1364 enum rtrs_clt_state new_state)
1366 enum rtrs_clt_state old_state;
1368 return rtrs_clt_change_state_get_old(sess, new_state, &old_state);
1371 static void rtrs_clt_hb_err_handler(struct rtrs_con *c)
1373 struct rtrs_clt_con *con = container_of(c, typeof(*con), c);
1375 rtrs_rdma_error_recovery(con);
1378 static void rtrs_clt_init_hb(struct rtrs_clt_sess *sess)
1380 rtrs_init_hb(&sess->s, &io_comp_cqe,
1381 RTRS_HB_INTERVAL_MS,
1382 RTRS_HB_MISSED_MAX,
1383 rtrs_clt_hb_err_handler,
1384 rtrs_wq);
1387 static void rtrs_clt_start_hb(struct rtrs_clt_sess *sess)
1389 rtrs_start_hb(&sess->s);
1392 static void rtrs_clt_stop_hb(struct rtrs_clt_sess *sess)
1394 rtrs_stop_hb(&sess->s);
1397 static void rtrs_clt_reconnect_work(struct work_struct *work);
1398 static void rtrs_clt_close_work(struct work_struct *work);
1400 static struct rtrs_clt_sess *alloc_sess(struct rtrs_clt *clt,
1401 const struct rtrs_addr *path,
1402 size_t con_num, u16 max_segments,
1403 size_t max_segment_size)
1405 struct rtrs_clt_sess *sess;
1406 int err = -ENOMEM;
1407 int cpu;
1409 sess = kzalloc(sizeof(*sess), GFP_KERNEL);
1410 if (!sess)
1411 goto err;
1413 /* Extra connection for user messages */
1414 con_num += 1;
1416 sess->s.con = kcalloc(con_num, sizeof(*sess->s.con), GFP_KERNEL);
1417 if (!sess->s.con)
1418 goto err_free_sess;
1420 sess->stats = kzalloc(sizeof(*sess->stats), GFP_KERNEL);
1421 if (!sess->stats)
1422 goto err_free_con;
1424 mutex_init(&sess->init_mutex);
1425 uuid_gen(&sess->s.uuid);
1426 memcpy(&sess->s.dst_addr, path->dst,
1427 rdma_addr_size((struct sockaddr *)path->dst));
1430 * rdma_resolve_addr() passes src_addr to cma_bind_addr, which
1431 * checks the sa_family to be non-zero. If user passed src_addr=NULL
1432 * the sess->src_addr will contain only zeros, which is then fine.
1434 if (path->src)
1435 memcpy(&sess->s.src_addr, path->src,
1436 rdma_addr_size((struct sockaddr *)path->src));
1437 strlcpy(sess->s.sessname, clt->sessname, sizeof(sess->s.sessname));
1438 sess->s.con_num = con_num;
1439 sess->clt = clt;
1440 sess->max_pages_per_mr = max_segments * max_segment_size >> 12;
1441 init_waitqueue_head(&sess->state_wq);
1442 sess->state = RTRS_CLT_CONNECTING;
1443 atomic_set(&sess->connected_cnt, 0);
1444 INIT_WORK(&sess->close_work, rtrs_clt_close_work);
1445 INIT_DELAYED_WORK(&sess->reconnect_dwork, rtrs_clt_reconnect_work);
1446 rtrs_clt_init_hb(sess);
1448 sess->mp_skip_entry = alloc_percpu(typeof(*sess->mp_skip_entry));
1449 if (!sess->mp_skip_entry)
1450 goto err_free_stats;
1452 for_each_possible_cpu(cpu)
1453 INIT_LIST_HEAD(per_cpu_ptr(sess->mp_skip_entry, cpu));
1455 err = rtrs_clt_init_stats(sess->stats);
1456 if (err)
1457 goto err_free_percpu;
1459 return sess;
1461 err_free_percpu:
1462 free_percpu(sess->mp_skip_entry);
1463 err_free_stats:
1464 kfree(sess->stats);
1465 err_free_con:
1466 kfree(sess->s.con);
1467 err_free_sess:
1468 kfree(sess);
1469 err:
1470 return ERR_PTR(err);
1473 void free_sess(struct rtrs_clt_sess *sess)
1475 free_percpu(sess->mp_skip_entry);
1476 mutex_destroy(&sess->init_mutex);
1477 kfree(sess->s.con);
1478 kfree(sess->rbufs);
1479 kfree(sess);
1482 static int create_con(struct rtrs_clt_sess *sess, unsigned int cid)
1484 struct rtrs_clt_con *con;
1486 con = kzalloc(sizeof(*con), GFP_KERNEL);
1487 if (!con)
1488 return -ENOMEM;
1490 /* Map first two connections to the first CPU */
1491 con->cpu = (cid ? cid - 1 : 0) % nr_cpu_ids;
1492 con->c.cid = cid;
1493 con->c.sess = &sess->s;
1494 atomic_set(&con->io_cnt, 0);
1495 mutex_init(&con->con_mutex);
1497 sess->s.con[cid] = &con->c;
1499 return 0;
1502 static void destroy_con(struct rtrs_clt_con *con)
1504 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1506 sess->s.con[con->c.cid] = NULL;
1507 mutex_destroy(&con->con_mutex);
1508 kfree(con);
1511 static int create_con_cq_qp(struct rtrs_clt_con *con)
1513 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1514 u16 wr_queue_size;
1515 int err, cq_vector;
1516 struct rtrs_msg_rkey_rsp *rsp;
1518 lockdep_assert_held(&con->con_mutex);
1519 if (con->c.cid == 0) {
1521 * One completion for each receive and two for each send
1522 * (send request + registration)
1523 * + 2 for drain and heartbeat
1524 * in case qp gets into error state
1526 wr_queue_size = SERVICE_CON_QUEUE_DEPTH * 3 + 2;
1527 /* We must be the first here */
1528 if (WARN_ON(sess->s.dev))
1529 return -EINVAL;
1532 * The whole session uses device from user connection.
1533 * Be careful not to close user connection before ib dev
1534 * is gracefully put.
1536 sess->s.dev = rtrs_ib_dev_find_or_add(con->c.cm_id->device,
1537 &dev_pd);
1538 if (!sess->s.dev) {
1539 rtrs_wrn(sess->clt,
1540 "rtrs_ib_dev_find_get_or_add(): no memory\n");
1541 return -ENOMEM;
1543 sess->s.dev_ref = 1;
1544 query_fast_reg_mode(sess);
1545 } else {
1547 * Here we assume that session members are correctly set.
1548 * This is always true if user connection (cid == 0) is
1549 * established first.
1551 if (WARN_ON(!sess->s.dev))
1552 return -EINVAL;
1553 if (WARN_ON(!sess->queue_depth))
1554 return -EINVAL;
1556 /* Shared between connections */
1557 sess->s.dev_ref++;
1558 wr_queue_size =
1559 min_t(int, sess->s.dev->ib_dev->attrs.max_qp_wr,
1560 /* QD * (REQ + RSP + FR REGS or INVS) + drain */
1561 sess->queue_depth * 3 + 1);
1563 /* alloc iu to recv new rkey reply when server reports flags set */
1564 if (sess->flags == RTRS_MSG_NEW_RKEY_F || con->c.cid == 0) {
1565 con->rsp_ius = rtrs_iu_alloc(wr_queue_size, sizeof(*rsp),
1566 GFP_KERNEL, sess->s.dev->ib_dev,
1567 DMA_FROM_DEVICE,
1568 rtrs_clt_rdma_done);
1569 if (!con->rsp_ius)
1570 return -ENOMEM;
1571 con->queue_size = wr_queue_size;
1573 cq_vector = con->cpu % sess->s.dev->ib_dev->num_comp_vectors;
1574 err = rtrs_cq_qp_create(&sess->s, &con->c, sess->max_send_sge,
1575 cq_vector, wr_queue_size, wr_queue_size,
1576 IB_POLL_SOFTIRQ);
1578 * In case of error we do not bother to clean previous allocations,
1579 * since destroy_con_cq_qp() must be called.
1581 return err;
1584 static void destroy_con_cq_qp(struct rtrs_clt_con *con)
1586 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1589 * Be careful here: destroy_con_cq_qp() can be called even
1590 * create_con_cq_qp() failed, see comments there.
1592 lockdep_assert_held(&con->con_mutex);
1593 rtrs_cq_qp_destroy(&con->c);
1594 if (con->rsp_ius) {
1595 rtrs_iu_free(con->rsp_ius, sess->s.dev->ib_dev, con->queue_size);
1596 con->rsp_ius = NULL;
1597 con->queue_size = 0;
1599 if (sess->s.dev_ref && !--sess->s.dev_ref) {
1600 rtrs_ib_dev_put(sess->s.dev);
1601 sess->s.dev = NULL;
1605 static void stop_cm(struct rtrs_clt_con *con)
1607 rdma_disconnect(con->c.cm_id);
1608 if (con->c.qp)
1609 ib_drain_qp(con->c.qp);
1612 static void destroy_cm(struct rtrs_clt_con *con)
1614 rdma_destroy_id(con->c.cm_id);
1615 con->c.cm_id = NULL;
1618 static int rtrs_rdma_addr_resolved(struct rtrs_clt_con *con)
1620 struct rtrs_sess *s = con->c.sess;
1621 int err;
1623 mutex_lock(&con->con_mutex);
1624 err = create_con_cq_qp(con);
1625 mutex_unlock(&con->con_mutex);
1626 if (err) {
1627 rtrs_err(s, "create_con_cq_qp(), err: %d\n", err);
1628 return err;
1630 err = rdma_resolve_route(con->c.cm_id, RTRS_CONNECT_TIMEOUT_MS);
1631 if (err)
1632 rtrs_err(s, "Resolving route failed, err: %d\n", err);
1634 return err;
1637 static int rtrs_rdma_route_resolved(struct rtrs_clt_con *con)
1639 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1640 struct rtrs_clt *clt = sess->clt;
1641 struct rtrs_msg_conn_req msg;
1642 struct rdma_conn_param param;
1644 int err;
1646 param = (struct rdma_conn_param) {
1647 .retry_count = 7,
1648 .rnr_retry_count = 7,
1649 .private_data = &msg,
1650 .private_data_len = sizeof(msg),
1653 msg = (struct rtrs_msg_conn_req) {
1654 .magic = cpu_to_le16(RTRS_MAGIC),
1655 .version = cpu_to_le16(RTRS_PROTO_VER),
1656 .cid = cpu_to_le16(con->c.cid),
1657 .cid_num = cpu_to_le16(sess->s.con_num),
1658 .recon_cnt = cpu_to_le16(sess->s.recon_cnt),
1660 uuid_copy(&msg.sess_uuid, &sess->s.uuid);
1661 uuid_copy(&msg.paths_uuid, &clt->paths_uuid);
1663 err = rdma_connect_locked(con->c.cm_id, &param);
1664 if (err)
1665 rtrs_err(clt, "rdma_connect_locked(): %d\n", err);
1667 return err;
1670 static int rtrs_rdma_conn_established(struct rtrs_clt_con *con,
1671 struct rdma_cm_event *ev)
1673 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1674 struct rtrs_clt *clt = sess->clt;
1675 const struct rtrs_msg_conn_rsp *msg;
1676 u16 version, queue_depth;
1677 int errno;
1678 u8 len;
1680 msg = ev->param.conn.private_data;
1681 len = ev->param.conn.private_data_len;
1682 if (len < sizeof(*msg)) {
1683 rtrs_err(clt, "Invalid RTRS connection response\n");
1684 return -ECONNRESET;
1686 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) {
1687 rtrs_err(clt, "Invalid RTRS magic\n");
1688 return -ECONNRESET;
1690 version = le16_to_cpu(msg->version);
1691 if (version >> 8 != RTRS_PROTO_VER_MAJOR) {
1692 rtrs_err(clt, "Unsupported major RTRS version: %d, expected %d\n",
1693 version >> 8, RTRS_PROTO_VER_MAJOR);
1694 return -ECONNRESET;
1696 errno = le16_to_cpu(msg->errno);
1697 if (errno) {
1698 rtrs_err(clt, "Invalid RTRS message: errno %d\n",
1699 errno);
1700 return -ECONNRESET;
1702 if (con->c.cid == 0) {
1703 queue_depth = le16_to_cpu(msg->queue_depth);
1705 if (queue_depth > MAX_SESS_QUEUE_DEPTH) {
1706 rtrs_err(clt, "Invalid RTRS message: queue=%d\n",
1707 queue_depth);
1708 return -ECONNRESET;
1710 if (!sess->rbufs || sess->queue_depth < queue_depth) {
1711 kfree(sess->rbufs);
1712 sess->rbufs = kcalloc(queue_depth, sizeof(*sess->rbufs),
1713 GFP_KERNEL);
1714 if (!sess->rbufs)
1715 return -ENOMEM;
1717 sess->queue_depth = queue_depth;
1718 sess->max_hdr_size = le32_to_cpu(msg->max_hdr_size);
1719 sess->max_io_size = le32_to_cpu(msg->max_io_size);
1720 sess->flags = le32_to_cpu(msg->flags);
1721 sess->chunk_size = sess->max_io_size + sess->max_hdr_size;
1724 * Global queue depth and IO size is always a minimum.
1725 * If while a reconnection server sends us a value a bit
1726 * higher - client does not care and uses cached minimum.
1728 * Since we can have several sessions (paths) restablishing
1729 * connections in parallel, use lock.
1731 mutex_lock(&clt->paths_mutex);
1732 clt->queue_depth = min_not_zero(sess->queue_depth,
1733 clt->queue_depth);
1734 clt->max_io_size = min_not_zero(sess->max_io_size,
1735 clt->max_io_size);
1736 mutex_unlock(&clt->paths_mutex);
1739 * Cache the hca_port and hca_name for sysfs
1741 sess->hca_port = con->c.cm_id->port_num;
1742 scnprintf(sess->hca_name, sizeof(sess->hca_name),
1743 sess->s.dev->ib_dev->name);
1744 sess->s.src_addr = con->c.cm_id->route.addr.src_addr;
1747 return 0;
1750 static inline void flag_success_on_conn(struct rtrs_clt_con *con)
1752 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1754 atomic_inc(&sess->connected_cnt);
1755 con->cm_err = 1;
1758 static int rtrs_rdma_conn_rejected(struct rtrs_clt_con *con,
1759 struct rdma_cm_event *ev)
1761 struct rtrs_sess *s = con->c.sess;
1762 const struct rtrs_msg_conn_rsp *msg;
1763 const char *rej_msg;
1764 int status, errno;
1765 u8 data_len;
1767 status = ev->status;
1768 rej_msg = rdma_reject_msg(con->c.cm_id, status);
1769 msg = rdma_consumer_reject_data(con->c.cm_id, ev, &data_len);
1771 if (msg && data_len >= sizeof(*msg)) {
1772 errno = (int16_t)le16_to_cpu(msg->errno);
1773 if (errno == -EBUSY)
1774 rtrs_err(s,
1775 "Previous session is still exists on the server, please reconnect later\n");
1776 else
1777 rtrs_err(s,
1778 "Connect rejected: status %d (%s), rtrs errno %d\n",
1779 status, rej_msg, errno);
1780 } else {
1781 rtrs_err(s,
1782 "Connect rejected but with malformed message: status %d (%s)\n",
1783 status, rej_msg);
1786 return -ECONNRESET;
1789 static void rtrs_clt_close_conns(struct rtrs_clt_sess *sess, bool wait)
1791 if (rtrs_clt_change_state(sess, RTRS_CLT_CLOSING))
1792 queue_work(rtrs_wq, &sess->close_work);
1793 if (wait)
1794 flush_work(&sess->close_work);
1797 static inline void flag_error_on_conn(struct rtrs_clt_con *con, int cm_err)
1799 if (con->cm_err == 1) {
1800 struct rtrs_clt_sess *sess;
1802 sess = to_clt_sess(con->c.sess);
1803 if (atomic_dec_and_test(&sess->connected_cnt))
1805 wake_up(&sess->state_wq);
1807 con->cm_err = cm_err;
1810 static int rtrs_clt_rdma_cm_handler(struct rdma_cm_id *cm_id,
1811 struct rdma_cm_event *ev)
1813 struct rtrs_clt_con *con = cm_id->context;
1814 struct rtrs_sess *s = con->c.sess;
1815 struct rtrs_clt_sess *sess = to_clt_sess(s);
1816 int cm_err = 0;
1818 switch (ev->event) {
1819 case RDMA_CM_EVENT_ADDR_RESOLVED:
1820 cm_err = rtrs_rdma_addr_resolved(con);
1821 break;
1822 case RDMA_CM_EVENT_ROUTE_RESOLVED:
1823 cm_err = rtrs_rdma_route_resolved(con);
1824 break;
1825 case RDMA_CM_EVENT_ESTABLISHED:
1826 cm_err = rtrs_rdma_conn_established(con, ev);
1827 if (likely(!cm_err)) {
1829 * Report success and wake up. Here we abuse state_wq,
1830 * i.e. wake up without state change, but we set cm_err.
1832 flag_success_on_conn(con);
1833 wake_up(&sess->state_wq);
1834 return 0;
1836 break;
1837 case RDMA_CM_EVENT_REJECTED:
1838 cm_err = rtrs_rdma_conn_rejected(con, ev);
1839 break;
1840 case RDMA_CM_EVENT_DISCONNECTED:
1841 /* No message for disconnecting */
1842 cm_err = -ECONNRESET;
1843 break;
1844 case RDMA_CM_EVENT_CONNECT_ERROR:
1845 case RDMA_CM_EVENT_UNREACHABLE:
1846 case RDMA_CM_EVENT_ADDR_CHANGE:
1847 case RDMA_CM_EVENT_TIMEWAIT_EXIT:
1848 rtrs_wrn(s, "CM error event %d\n", ev->event);
1849 cm_err = -ECONNRESET;
1850 break;
1851 case RDMA_CM_EVENT_ADDR_ERROR:
1852 case RDMA_CM_EVENT_ROUTE_ERROR:
1853 rtrs_wrn(s, "CM error event %d\n", ev->event);
1854 cm_err = -EHOSTUNREACH;
1855 break;
1856 case RDMA_CM_EVENT_DEVICE_REMOVAL:
1858 * Device removal is a special case. Queue close and return 0.
1860 rtrs_clt_close_conns(sess, false);
1861 return 0;
1862 default:
1863 rtrs_err(s, "Unexpected RDMA CM event (%d)\n", ev->event);
1864 cm_err = -ECONNRESET;
1865 break;
1868 if (cm_err) {
1870 * cm error makes sense only on connection establishing,
1871 * in other cases we rely on normal procedure of reconnecting.
1873 flag_error_on_conn(con, cm_err);
1874 rtrs_rdma_error_recovery(con);
1877 return 0;
1880 static int create_cm(struct rtrs_clt_con *con)
1882 struct rtrs_sess *s = con->c.sess;
1883 struct rtrs_clt_sess *sess = to_clt_sess(s);
1884 struct rdma_cm_id *cm_id;
1885 int err;
1887 cm_id = rdma_create_id(&init_net, rtrs_clt_rdma_cm_handler, con,
1888 sess->s.dst_addr.ss_family == AF_IB ?
1889 RDMA_PS_IB : RDMA_PS_TCP, IB_QPT_RC);
1890 if (IS_ERR(cm_id)) {
1891 err = PTR_ERR(cm_id);
1892 rtrs_err(s, "Failed to create CM ID, err: %d\n", err);
1894 return err;
1896 con->c.cm_id = cm_id;
1897 con->cm_err = 0;
1898 /* allow the port to be reused */
1899 err = rdma_set_reuseaddr(cm_id, 1);
1900 if (err != 0) {
1901 rtrs_err(s, "Set address reuse failed, err: %d\n", err);
1902 goto destroy_cm;
1904 err = rdma_resolve_addr(cm_id, (struct sockaddr *)&sess->s.src_addr,
1905 (struct sockaddr *)&sess->s.dst_addr,
1906 RTRS_CONNECT_TIMEOUT_MS);
1907 if (err) {
1908 rtrs_err(s, "Failed to resolve address, err: %d\n", err);
1909 goto destroy_cm;
1912 * Combine connection status and session events. This is needed
1913 * for waiting two possible cases: cm_err has something meaningful
1914 * or session state was really changed to error by device removal.
1916 err = wait_event_interruptible_timeout(
1917 sess->state_wq,
1918 con->cm_err || sess->state != RTRS_CLT_CONNECTING,
1919 msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS));
1920 if (err == 0 || err == -ERESTARTSYS) {
1921 if (err == 0)
1922 err = -ETIMEDOUT;
1923 /* Timedout or interrupted */
1924 goto errr;
1926 if (con->cm_err < 0) {
1927 err = con->cm_err;
1928 goto errr;
1930 if (READ_ONCE(sess->state) != RTRS_CLT_CONNECTING) {
1931 /* Device removal */
1932 err = -ECONNABORTED;
1933 goto errr;
1936 return 0;
1938 errr:
1939 stop_cm(con);
1940 mutex_lock(&con->con_mutex);
1941 destroy_con_cq_qp(con);
1942 mutex_unlock(&con->con_mutex);
1943 destroy_cm:
1944 destroy_cm(con);
1946 return err;
1949 static void rtrs_clt_sess_up(struct rtrs_clt_sess *sess)
1951 struct rtrs_clt *clt = sess->clt;
1952 int up;
1955 * We can fire RECONNECTED event only when all paths were
1956 * connected on rtrs_clt_open(), then each was disconnected
1957 * and the first one connected again. That's why this nasty
1958 * game with counter value.
1961 mutex_lock(&clt->paths_ev_mutex);
1962 up = ++clt->paths_up;
1964 * Here it is safe to access paths num directly since up counter
1965 * is greater than MAX_PATHS_NUM only while rtrs_clt_open() is
1966 * in progress, thus paths removals are impossible.
1968 if (up > MAX_PATHS_NUM && up == MAX_PATHS_NUM + clt->paths_num)
1969 clt->paths_up = clt->paths_num;
1970 else if (up == 1)
1971 clt->link_ev(clt->priv, RTRS_CLT_LINK_EV_RECONNECTED);
1972 mutex_unlock(&clt->paths_ev_mutex);
1974 /* Mark session as established */
1975 sess->established = true;
1976 sess->reconnect_attempts = 0;
1977 sess->stats->reconnects.successful_cnt++;
1980 static void rtrs_clt_sess_down(struct rtrs_clt_sess *sess)
1982 struct rtrs_clt *clt = sess->clt;
1984 if (!sess->established)
1985 return;
1987 sess->established = false;
1988 mutex_lock(&clt->paths_ev_mutex);
1989 WARN_ON(!clt->paths_up);
1990 if (--clt->paths_up == 0)
1991 clt->link_ev(clt->priv, RTRS_CLT_LINK_EV_DISCONNECTED);
1992 mutex_unlock(&clt->paths_ev_mutex);
1995 static void rtrs_clt_stop_and_destroy_conns(struct rtrs_clt_sess *sess)
1997 struct rtrs_clt_con *con;
1998 unsigned int cid;
2000 WARN_ON(READ_ONCE(sess->state) == RTRS_CLT_CONNECTED);
2003 * Possible race with rtrs_clt_open(), when DEVICE_REMOVAL comes
2004 * exactly in between. Start destroying after it finishes.
2006 mutex_lock(&sess->init_mutex);
2007 mutex_unlock(&sess->init_mutex);
2010 * All IO paths must observe !CONNECTED state before we
2011 * free everything.
2013 synchronize_rcu();
2015 rtrs_clt_stop_hb(sess);
2018 * The order it utterly crucial: firstly disconnect and complete all
2019 * rdma requests with error (thus set in_use=false for requests),
2020 * then fail outstanding requests checking in_use for each, and
2021 * eventually notify upper layer about session disconnection.
2024 for (cid = 0; cid < sess->s.con_num; cid++) {
2025 if (!sess->s.con[cid])
2026 break;
2027 con = to_clt_con(sess->s.con[cid]);
2028 stop_cm(con);
2030 fail_all_outstanding_reqs(sess);
2031 free_sess_reqs(sess);
2032 rtrs_clt_sess_down(sess);
2035 * Wait for graceful shutdown, namely when peer side invokes
2036 * rdma_disconnect(). 'connected_cnt' is decremented only on
2037 * CM events, thus if other side had crashed and hb has detected
2038 * something is wrong, here we will stuck for exactly timeout ms,
2039 * since CM does not fire anything. That is fine, we are not in
2040 * hurry.
2042 wait_event_timeout(sess->state_wq, !atomic_read(&sess->connected_cnt),
2043 msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS));
2045 for (cid = 0; cid < sess->s.con_num; cid++) {
2046 if (!sess->s.con[cid])
2047 break;
2048 con = to_clt_con(sess->s.con[cid]);
2049 mutex_lock(&con->con_mutex);
2050 destroy_con_cq_qp(con);
2051 mutex_unlock(&con->con_mutex);
2052 destroy_cm(con);
2053 destroy_con(con);
2057 static inline bool xchg_sessions(struct rtrs_clt_sess __rcu **rcu_ppcpu_path,
2058 struct rtrs_clt_sess *sess,
2059 struct rtrs_clt_sess *next)
2061 struct rtrs_clt_sess **ppcpu_path;
2063 /* Call cmpxchg() without sparse warnings */
2064 ppcpu_path = (typeof(ppcpu_path))rcu_ppcpu_path;
2065 return sess == cmpxchg(ppcpu_path, sess, next);
2068 static void rtrs_clt_remove_path_from_arr(struct rtrs_clt_sess *sess)
2070 struct rtrs_clt *clt = sess->clt;
2071 struct rtrs_clt_sess *next;
2072 bool wait_for_grace = false;
2073 int cpu;
2075 mutex_lock(&clt->paths_mutex);
2076 list_del_rcu(&sess->s.entry);
2078 /* Make sure everybody observes path removal. */
2079 synchronize_rcu();
2082 * At this point nobody sees @sess in the list, but still we have
2083 * dangling pointer @pcpu_path which _can_ point to @sess. Since
2084 * nobody can observe @sess in the list, we guarantee that IO path
2085 * will not assign @sess to @pcpu_path, i.e. @pcpu_path can be equal
2086 * to @sess, but can never again become @sess.
2090 * Decrement paths number only after grace period, because
2091 * caller of do_each_path() must firstly observe list without
2092 * path and only then decremented paths number.
2094 * Otherwise there can be the following situation:
2095 * o Two paths exist and IO is coming.
2096 * o One path is removed:
2097 * CPU#0 CPU#1
2098 * do_each_path(): rtrs_clt_remove_path_from_arr():
2099 * path = get_next_path()
2100 * ^^^ list_del_rcu(path)
2101 * [!CONNECTED path] clt->paths_num--
2102 * ^^^^^^^^^
2103 * load clt->paths_num from 2 to 1
2104 * ^^^^^^^^^
2105 * sees 1
2107 * path is observed as !CONNECTED, but do_each_path() loop
2108 * ends, because expression i < clt->paths_num is false.
2110 clt->paths_num--;
2113 * Get @next connection from current @sess which is going to be
2114 * removed. If @sess is the last element, then @next is NULL.
2116 rcu_read_lock();
2117 next = list_next_or_null_rr_rcu(&clt->paths_list, &sess->s.entry,
2118 typeof(*next), s.entry);
2119 rcu_read_unlock();
2122 * @pcpu paths can still point to the path which is going to be
2123 * removed, so change the pointer manually.
2125 for_each_possible_cpu(cpu) {
2126 struct rtrs_clt_sess __rcu **ppcpu_path;
2128 ppcpu_path = per_cpu_ptr(clt->pcpu_path, cpu);
2129 if (rcu_dereference_protected(*ppcpu_path,
2130 lockdep_is_held(&clt->paths_mutex)) != sess)
2132 * synchronize_rcu() was called just after deleting
2133 * entry from the list, thus IO code path cannot
2134 * change pointer back to the pointer which is going
2135 * to be removed, we are safe here.
2137 continue;
2140 * We race with IO code path, which also changes pointer,
2141 * thus we have to be careful not to overwrite it.
2143 if (xchg_sessions(ppcpu_path, sess, next))
2145 * @ppcpu_path was successfully replaced with @next,
2146 * that means that someone could also pick up the
2147 * @sess and dereferencing it right now, so wait for
2148 * a grace period is required.
2150 wait_for_grace = true;
2152 if (wait_for_grace)
2153 synchronize_rcu();
2155 mutex_unlock(&clt->paths_mutex);
2158 static void rtrs_clt_add_path_to_arr(struct rtrs_clt_sess *sess)
2160 struct rtrs_clt *clt = sess->clt;
2162 mutex_lock(&clt->paths_mutex);
2163 clt->paths_num++;
2165 list_add_tail_rcu(&sess->s.entry, &clt->paths_list);
2166 mutex_unlock(&clt->paths_mutex);
2169 static void rtrs_clt_close_work(struct work_struct *work)
2171 struct rtrs_clt_sess *sess;
2173 sess = container_of(work, struct rtrs_clt_sess, close_work);
2175 cancel_delayed_work_sync(&sess->reconnect_dwork);
2176 rtrs_clt_stop_and_destroy_conns(sess);
2177 rtrs_clt_change_state(sess, RTRS_CLT_CLOSED);
2180 static int init_conns(struct rtrs_clt_sess *sess)
2182 unsigned int cid;
2183 int err;
2186 * On every new session connections increase reconnect counter
2187 * to avoid clashes with previous sessions not yet closed
2188 * sessions on a server side.
2190 sess->s.recon_cnt++;
2192 /* Establish all RDMA connections */
2193 for (cid = 0; cid < sess->s.con_num; cid++) {
2194 err = create_con(sess, cid);
2195 if (err)
2196 goto destroy;
2198 err = create_cm(to_clt_con(sess->s.con[cid]));
2199 if (err) {
2200 destroy_con(to_clt_con(sess->s.con[cid]));
2201 goto destroy;
2204 err = alloc_sess_reqs(sess);
2205 if (err)
2206 goto destroy;
2208 rtrs_clt_start_hb(sess);
2210 return 0;
2212 destroy:
2213 while (cid--) {
2214 struct rtrs_clt_con *con = to_clt_con(sess->s.con[cid]);
2216 stop_cm(con);
2218 mutex_lock(&con->con_mutex);
2219 destroy_con_cq_qp(con);
2220 mutex_unlock(&con->con_mutex);
2221 destroy_cm(con);
2222 destroy_con(con);
2225 * If we've never taken async path and got an error, say,
2226 * doing rdma_resolve_addr(), switch to CONNECTION_ERR state
2227 * manually to keep reconnecting.
2229 rtrs_clt_change_state(sess, RTRS_CLT_CONNECTING_ERR);
2231 return err;
2234 static void rtrs_clt_info_req_done(struct ib_cq *cq, struct ib_wc *wc)
2236 struct rtrs_clt_con *con = cq->cq_context;
2237 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
2238 struct rtrs_iu *iu;
2240 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
2241 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1);
2243 if (unlikely(wc->status != IB_WC_SUCCESS)) {
2244 rtrs_err(sess->clt, "Sess info request send failed: %s\n",
2245 ib_wc_status_msg(wc->status));
2246 rtrs_clt_change_state(sess, RTRS_CLT_CONNECTING_ERR);
2247 return;
2250 rtrs_clt_update_wc_stats(con);
2253 static int process_info_rsp(struct rtrs_clt_sess *sess,
2254 const struct rtrs_msg_info_rsp *msg)
2256 unsigned int sg_cnt, total_len;
2257 int i, sgi;
2259 sg_cnt = le16_to_cpu(msg->sg_cnt);
2260 if (unlikely(!sg_cnt || (sess->queue_depth % sg_cnt))) {
2261 rtrs_err(sess->clt, "Incorrect sg_cnt %d, is not multiple\n",
2262 sg_cnt);
2263 return -EINVAL;
2267 * Check if IB immediate data size is enough to hold the mem_id and
2268 * the offset inside the memory chunk.
2270 if (unlikely((ilog2(sg_cnt - 1) + 1) +
2271 (ilog2(sess->chunk_size - 1) + 1) >
2272 MAX_IMM_PAYL_BITS)) {
2273 rtrs_err(sess->clt,
2274 "RDMA immediate size (%db) not enough to encode %d buffers of size %dB\n",
2275 MAX_IMM_PAYL_BITS, sg_cnt, sess->chunk_size);
2276 return -EINVAL;
2278 total_len = 0;
2279 for (sgi = 0, i = 0; sgi < sg_cnt && i < sess->queue_depth; sgi++) {
2280 const struct rtrs_sg_desc *desc = &msg->desc[sgi];
2281 u32 len, rkey;
2282 u64 addr;
2284 addr = le64_to_cpu(desc->addr);
2285 rkey = le32_to_cpu(desc->key);
2286 len = le32_to_cpu(desc->len);
2288 total_len += len;
2290 if (unlikely(!len || (len % sess->chunk_size))) {
2291 rtrs_err(sess->clt, "Incorrect [%d].len %d\n", sgi,
2292 len);
2293 return -EINVAL;
2295 for ( ; len && i < sess->queue_depth; i++) {
2296 sess->rbufs[i].addr = addr;
2297 sess->rbufs[i].rkey = rkey;
2299 len -= sess->chunk_size;
2300 addr += sess->chunk_size;
2303 /* Sanity check */
2304 if (unlikely(sgi != sg_cnt || i != sess->queue_depth)) {
2305 rtrs_err(sess->clt, "Incorrect sg vector, not fully mapped\n");
2306 return -EINVAL;
2308 if (unlikely(total_len != sess->chunk_size * sess->queue_depth)) {
2309 rtrs_err(sess->clt, "Incorrect total_len %d\n", total_len);
2310 return -EINVAL;
2313 return 0;
2316 static void rtrs_clt_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc)
2318 struct rtrs_clt_con *con = cq->cq_context;
2319 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
2320 struct rtrs_msg_info_rsp *msg;
2321 enum rtrs_clt_state state;
2322 struct rtrs_iu *iu;
2323 size_t rx_sz;
2324 int err;
2326 state = RTRS_CLT_CONNECTING_ERR;
2328 WARN_ON(con->c.cid);
2329 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
2330 if (unlikely(wc->status != IB_WC_SUCCESS)) {
2331 rtrs_err(sess->clt, "Sess info response recv failed: %s\n",
2332 ib_wc_status_msg(wc->status));
2333 goto out;
2335 WARN_ON(wc->opcode != IB_WC_RECV);
2337 if (unlikely(wc->byte_len < sizeof(*msg))) {
2338 rtrs_err(sess->clt, "Sess info response is malformed: size %d\n",
2339 wc->byte_len);
2340 goto out;
2342 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr,
2343 iu->size, DMA_FROM_DEVICE);
2344 msg = iu->buf;
2345 if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_INFO_RSP)) {
2346 rtrs_err(sess->clt, "Sess info response is malformed: type %d\n",
2347 le16_to_cpu(msg->type));
2348 goto out;
2350 rx_sz = sizeof(*msg);
2351 rx_sz += sizeof(msg->desc[0]) * le16_to_cpu(msg->sg_cnt);
2352 if (unlikely(wc->byte_len < rx_sz)) {
2353 rtrs_err(sess->clt, "Sess info response is malformed: size %d\n",
2354 wc->byte_len);
2355 goto out;
2357 err = process_info_rsp(sess, msg);
2358 if (unlikely(err))
2359 goto out;
2361 err = post_recv_sess(sess);
2362 if (unlikely(err))
2363 goto out;
2365 state = RTRS_CLT_CONNECTED;
2367 out:
2368 rtrs_clt_update_wc_stats(con);
2369 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1);
2370 rtrs_clt_change_state(sess, state);
2373 static int rtrs_send_sess_info(struct rtrs_clt_sess *sess)
2375 struct rtrs_clt_con *usr_con = to_clt_con(sess->s.con[0]);
2376 struct rtrs_msg_info_req *msg;
2377 struct rtrs_iu *tx_iu, *rx_iu;
2378 size_t rx_sz;
2379 int err;
2381 rx_sz = sizeof(struct rtrs_msg_info_rsp);
2382 rx_sz += sizeof(u64) * MAX_SESS_QUEUE_DEPTH;
2384 tx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req), GFP_KERNEL,
2385 sess->s.dev->ib_dev, DMA_TO_DEVICE,
2386 rtrs_clt_info_req_done);
2387 rx_iu = rtrs_iu_alloc(1, rx_sz, GFP_KERNEL, sess->s.dev->ib_dev,
2388 DMA_FROM_DEVICE, rtrs_clt_info_rsp_done);
2389 if (unlikely(!tx_iu || !rx_iu)) {
2390 err = -ENOMEM;
2391 goto out;
2393 /* Prepare for getting info response */
2394 err = rtrs_iu_post_recv(&usr_con->c, rx_iu);
2395 if (unlikely(err)) {
2396 rtrs_err(sess->clt, "rtrs_iu_post_recv(), err: %d\n", err);
2397 goto out;
2399 rx_iu = NULL;
2401 msg = tx_iu->buf;
2402 msg->type = cpu_to_le16(RTRS_MSG_INFO_REQ);
2403 memcpy(msg->sessname, sess->s.sessname, sizeof(msg->sessname));
2405 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, tx_iu->dma_addr,
2406 tx_iu->size, DMA_TO_DEVICE);
2408 /* Send info request */
2409 err = rtrs_iu_post_send(&usr_con->c, tx_iu, sizeof(*msg), NULL);
2410 if (unlikely(err)) {
2411 rtrs_err(sess->clt, "rtrs_iu_post_send(), err: %d\n", err);
2412 goto out;
2414 tx_iu = NULL;
2416 /* Wait for state change */
2417 wait_event_interruptible_timeout(sess->state_wq,
2418 sess->state != RTRS_CLT_CONNECTING,
2419 msecs_to_jiffies(
2420 RTRS_CONNECT_TIMEOUT_MS));
2421 if (unlikely(READ_ONCE(sess->state) != RTRS_CLT_CONNECTED)) {
2422 if (READ_ONCE(sess->state) == RTRS_CLT_CONNECTING_ERR)
2423 err = -ECONNRESET;
2424 else
2425 err = -ETIMEDOUT;
2426 goto out;
2429 out:
2430 if (tx_iu)
2431 rtrs_iu_free(tx_iu, sess->s.dev->ib_dev, 1);
2432 if (rx_iu)
2433 rtrs_iu_free(rx_iu, sess->s.dev->ib_dev, 1);
2434 if (unlikely(err))
2435 /* If we've never taken async path because of malloc problems */
2436 rtrs_clt_change_state(sess, RTRS_CLT_CONNECTING_ERR);
2438 return err;
2442 * init_sess() - establishes all session connections and does handshake
2443 * @sess: client session.
2444 * In case of error full close or reconnect procedure should be taken,
2445 * because reconnect or close async works can be started.
2447 static int init_sess(struct rtrs_clt_sess *sess)
2449 int err;
2451 mutex_lock(&sess->init_mutex);
2452 err = init_conns(sess);
2453 if (err) {
2454 rtrs_err(sess->clt, "init_conns(), err: %d\n", err);
2455 goto out;
2457 err = rtrs_send_sess_info(sess);
2458 if (err) {
2459 rtrs_err(sess->clt, "rtrs_send_sess_info(), err: %d\n", err);
2460 goto out;
2462 rtrs_clt_sess_up(sess);
2463 out:
2464 mutex_unlock(&sess->init_mutex);
2466 return err;
2469 static void rtrs_clt_reconnect_work(struct work_struct *work)
2471 struct rtrs_clt_sess *sess;
2472 struct rtrs_clt *clt;
2473 unsigned int delay_ms;
2474 int err;
2476 sess = container_of(to_delayed_work(work), struct rtrs_clt_sess,
2477 reconnect_dwork);
2478 clt = sess->clt;
2480 if (READ_ONCE(sess->state) != RTRS_CLT_RECONNECTING)
2481 return;
2483 if (sess->reconnect_attempts >= clt->max_reconnect_attempts) {
2484 /* Close a session completely if max attempts is reached */
2485 rtrs_clt_close_conns(sess, false);
2486 return;
2488 sess->reconnect_attempts++;
2490 /* Stop everything */
2491 rtrs_clt_stop_and_destroy_conns(sess);
2492 msleep(RTRS_RECONNECT_BACKOFF);
2493 if (rtrs_clt_change_state(sess, RTRS_CLT_CONNECTING)) {
2494 err = init_sess(sess);
2495 if (err)
2496 goto reconnect_again;
2499 return;
2501 reconnect_again:
2502 if (rtrs_clt_change_state(sess, RTRS_CLT_RECONNECTING)) {
2503 sess->stats->reconnects.fail_cnt++;
2504 delay_ms = clt->reconnect_delay_sec * 1000;
2505 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork,
2506 msecs_to_jiffies(delay_ms +
2507 prandom_u32() %
2508 RTRS_RECONNECT_SEED));
2512 static void rtrs_clt_dev_release(struct device *dev)
2514 struct rtrs_clt *clt = container_of(dev, struct rtrs_clt, dev);
2516 kfree(clt);
2519 static struct rtrs_clt *alloc_clt(const char *sessname, size_t paths_num,
2520 u16 port, size_t pdu_sz, void *priv,
2521 void (*link_ev)(void *priv,
2522 enum rtrs_clt_link_ev ev),
2523 unsigned int max_segments,
2524 size_t max_segment_size,
2525 unsigned int reconnect_delay_sec,
2526 unsigned int max_reconnect_attempts)
2528 struct rtrs_clt *clt;
2529 int err;
2531 if (!paths_num || paths_num > MAX_PATHS_NUM)
2532 return ERR_PTR(-EINVAL);
2534 if (strlen(sessname) >= sizeof(clt->sessname))
2535 return ERR_PTR(-EINVAL);
2537 clt = kzalloc(sizeof(*clt), GFP_KERNEL);
2538 if (!clt)
2539 return ERR_PTR(-ENOMEM);
2541 clt->pcpu_path = alloc_percpu(typeof(*clt->pcpu_path));
2542 if (!clt->pcpu_path) {
2543 kfree(clt);
2544 return ERR_PTR(-ENOMEM);
2547 uuid_gen(&clt->paths_uuid);
2548 INIT_LIST_HEAD_RCU(&clt->paths_list);
2549 clt->paths_num = paths_num;
2550 clt->paths_up = MAX_PATHS_NUM;
2551 clt->port = port;
2552 clt->pdu_sz = pdu_sz;
2553 clt->max_segments = max_segments;
2554 clt->max_segment_size = max_segment_size;
2555 clt->reconnect_delay_sec = reconnect_delay_sec;
2556 clt->max_reconnect_attempts = max_reconnect_attempts;
2557 clt->priv = priv;
2558 clt->link_ev = link_ev;
2559 clt->mp_policy = MP_POLICY_MIN_INFLIGHT;
2560 strlcpy(clt->sessname, sessname, sizeof(clt->sessname));
2561 init_waitqueue_head(&clt->permits_wait);
2562 mutex_init(&clt->paths_ev_mutex);
2563 mutex_init(&clt->paths_mutex);
2565 clt->dev.class = rtrs_clt_dev_class;
2566 clt->dev.release = rtrs_clt_dev_release;
2567 err = dev_set_name(&clt->dev, "%s", sessname);
2568 if (err) {
2569 free_percpu(clt->pcpu_path);
2570 kfree(clt);
2571 return ERR_PTR(err);
2574 * Suppress user space notification until
2575 * sysfs files are created
2577 dev_set_uevent_suppress(&clt->dev, true);
2578 err = device_register(&clt->dev);
2579 if (err) {
2580 free_percpu(clt->pcpu_path);
2581 put_device(&clt->dev);
2582 return ERR_PTR(err);
2585 clt->kobj_paths = kobject_create_and_add("paths", &clt->dev.kobj);
2586 if (!clt->kobj_paths) {
2587 free_percpu(clt->pcpu_path);
2588 device_unregister(&clt->dev);
2589 return NULL;
2591 err = rtrs_clt_create_sysfs_root_files(clt);
2592 if (err) {
2593 free_percpu(clt->pcpu_path);
2594 kobject_del(clt->kobj_paths);
2595 kobject_put(clt->kobj_paths);
2596 device_unregister(&clt->dev);
2597 return ERR_PTR(err);
2599 dev_set_uevent_suppress(&clt->dev, false);
2600 kobject_uevent(&clt->dev.kobj, KOBJ_ADD);
2602 return clt;
2605 static void wait_for_inflight_permits(struct rtrs_clt *clt)
2607 if (clt->permits_map) {
2608 size_t sz = clt->queue_depth;
2610 wait_event(clt->permits_wait,
2611 find_first_bit(clt->permits_map, sz) >= sz);
2615 static void free_clt(struct rtrs_clt *clt)
2617 wait_for_inflight_permits(clt);
2618 free_permits(clt);
2619 free_percpu(clt->pcpu_path);
2620 mutex_destroy(&clt->paths_ev_mutex);
2621 mutex_destroy(&clt->paths_mutex);
2622 /* release callback will free clt in last put */
2623 device_unregister(&clt->dev);
2627 * rtrs_clt_open() - Open a session to an RTRS server
2628 * @ops: holds the link event callback and the private pointer.
2629 * @sessname: name of the session
2630 * @paths: Paths to be established defined by their src and dst addresses
2631 * @paths_num: Number of elements in the @paths array
2632 * @port: port to be used by the RTRS session
2633 * @pdu_sz: Size of extra payload which can be accessed after permit allocation.
2634 * @reconnect_delay_sec: time between reconnect tries
2635 * @max_segments: Max. number of segments per IO request
2636 * @max_segment_size: Max. size of one segment
2637 * @max_reconnect_attempts: Number of times to reconnect on error before giving
2638 * up, 0 for * disabled, -1 for forever
2640 * Starts session establishment with the rtrs_server. The function can block
2641 * up to ~2000ms before it returns.
2643 * Return a valid pointer on success otherwise PTR_ERR.
2645 struct rtrs_clt *rtrs_clt_open(struct rtrs_clt_ops *ops,
2646 const char *sessname,
2647 const struct rtrs_addr *paths,
2648 size_t paths_num, u16 port,
2649 size_t pdu_sz, u8 reconnect_delay_sec,
2650 u16 max_segments,
2651 size_t max_segment_size,
2652 s16 max_reconnect_attempts)
2654 struct rtrs_clt_sess *sess, *tmp;
2655 struct rtrs_clt *clt;
2656 int err, i;
2658 clt = alloc_clt(sessname, paths_num, port, pdu_sz, ops->priv,
2659 ops->link_ev,
2660 max_segments, max_segment_size, reconnect_delay_sec,
2661 max_reconnect_attempts);
2662 if (IS_ERR(clt)) {
2663 err = PTR_ERR(clt);
2664 goto out;
2666 for (i = 0; i < paths_num; i++) {
2667 struct rtrs_clt_sess *sess;
2669 sess = alloc_sess(clt, &paths[i], nr_cpu_ids,
2670 max_segments, max_segment_size);
2671 if (IS_ERR(sess)) {
2672 err = PTR_ERR(sess);
2673 goto close_all_sess;
2675 list_add_tail_rcu(&sess->s.entry, &clt->paths_list);
2677 err = init_sess(sess);
2678 if (err) {
2679 list_del_rcu(&sess->s.entry);
2680 rtrs_clt_close_conns(sess, true);
2681 free_sess(sess);
2682 goto close_all_sess;
2685 err = rtrs_clt_create_sess_files(sess);
2686 if (err) {
2687 list_del_rcu(&sess->s.entry);
2688 rtrs_clt_close_conns(sess, true);
2689 free_sess(sess);
2690 goto close_all_sess;
2693 err = alloc_permits(clt);
2694 if (err)
2695 goto close_all_sess;
2697 return clt;
2699 close_all_sess:
2700 list_for_each_entry_safe(sess, tmp, &clt->paths_list, s.entry) {
2701 rtrs_clt_destroy_sess_files(sess, NULL);
2702 rtrs_clt_close_conns(sess, true);
2703 kobject_put(&sess->kobj);
2705 rtrs_clt_destroy_sysfs_root_files(clt);
2706 rtrs_clt_destroy_sysfs_root_folders(clt);
2707 free_clt(clt);
2709 out:
2710 return ERR_PTR(err);
2712 EXPORT_SYMBOL(rtrs_clt_open);
2715 * rtrs_clt_close() - Close a session
2716 * @clt: Session handle. Session is freed upon return.
2718 void rtrs_clt_close(struct rtrs_clt *clt)
2720 struct rtrs_clt_sess *sess, *tmp;
2722 /* Firstly forbid sysfs access */
2723 rtrs_clt_destroy_sysfs_root_files(clt);
2724 rtrs_clt_destroy_sysfs_root_folders(clt);
2726 /* Now it is safe to iterate over all paths without locks */
2727 list_for_each_entry_safe(sess, tmp, &clt->paths_list, s.entry) {
2728 rtrs_clt_destroy_sess_files(sess, NULL);
2729 rtrs_clt_close_conns(sess, true);
2730 kobject_put(&sess->kobj);
2732 free_clt(clt);
2734 EXPORT_SYMBOL(rtrs_clt_close);
2736 int rtrs_clt_reconnect_from_sysfs(struct rtrs_clt_sess *sess)
2738 enum rtrs_clt_state old_state;
2739 int err = -EBUSY;
2740 bool changed;
2742 changed = rtrs_clt_change_state_get_old(sess, RTRS_CLT_RECONNECTING,
2743 &old_state);
2744 if (changed) {
2745 sess->reconnect_attempts = 0;
2746 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork, 0);
2748 if (changed || old_state == RTRS_CLT_RECONNECTING) {
2750 * flush_delayed_work() queues pending work for immediate
2751 * execution, so do the flush if we have queued something
2752 * right now or work is pending.
2754 flush_delayed_work(&sess->reconnect_dwork);
2755 err = (READ_ONCE(sess->state) ==
2756 RTRS_CLT_CONNECTED ? 0 : -ENOTCONN);
2759 return err;
2762 int rtrs_clt_disconnect_from_sysfs(struct rtrs_clt_sess *sess)
2764 rtrs_clt_close_conns(sess, true);
2766 return 0;
2769 int rtrs_clt_remove_path_from_sysfs(struct rtrs_clt_sess *sess,
2770 const struct attribute *sysfs_self)
2772 enum rtrs_clt_state old_state;
2773 bool changed;
2776 * Continue stopping path till state was changed to DEAD or
2777 * state was observed as DEAD:
2778 * 1. State was changed to DEAD - we were fast and nobody
2779 * invoked rtrs_clt_reconnect(), which can again start
2780 * reconnecting.
2781 * 2. State was observed as DEAD - we have someone in parallel
2782 * removing the path.
2784 do {
2785 rtrs_clt_close_conns(sess, true);
2786 changed = rtrs_clt_change_state_get_old(sess,
2787 RTRS_CLT_DEAD,
2788 &old_state);
2789 } while (!changed && old_state != RTRS_CLT_DEAD);
2791 if (likely(changed)) {
2792 rtrs_clt_destroy_sess_files(sess, sysfs_self);
2793 rtrs_clt_remove_path_from_arr(sess);
2794 kobject_put(&sess->kobj);
2797 return 0;
2800 void rtrs_clt_set_max_reconnect_attempts(struct rtrs_clt *clt, int value)
2802 clt->max_reconnect_attempts = (unsigned int)value;
2805 int rtrs_clt_get_max_reconnect_attempts(const struct rtrs_clt *clt)
2807 return (int)clt->max_reconnect_attempts;
2811 * rtrs_clt_request() - Request data transfer to/from server via RDMA.
2813 * @dir: READ/WRITE
2814 * @ops: callback function to be called as confirmation, and the pointer.
2815 * @clt: Session
2816 * @permit: Preallocated permit
2817 * @vec: Message that is sent to server together with the request.
2818 * Sum of len of all @vec elements limited to <= IO_MSG_SIZE.
2819 * Since the msg is copied internally it can be allocated on stack.
2820 * @nr: Number of elements in @vec.
2821 * @data_len: length of data sent to/from server
2822 * @sg: Pages to be sent/received to/from server.
2823 * @sg_cnt: Number of elements in the @sg
2825 * Return:
2826 * 0: Success
2827 * <0: Error
2829 * On dir=READ rtrs client will request a data transfer from Server to client.
2830 * The data that the server will respond with will be stored in @sg when
2831 * the user receives an %RTRS_CLT_RDMA_EV_RDMA_REQUEST_WRITE_COMPL event.
2832 * On dir=WRITE rtrs client will rdma write data in sg to server side.
2834 int rtrs_clt_request(int dir, struct rtrs_clt_req_ops *ops,
2835 struct rtrs_clt *clt, struct rtrs_permit *permit,
2836 const struct kvec *vec, size_t nr, size_t data_len,
2837 struct scatterlist *sg, unsigned int sg_cnt)
2839 struct rtrs_clt_io_req *req;
2840 struct rtrs_clt_sess *sess;
2842 enum dma_data_direction dma_dir;
2843 int err = -ECONNABORTED, i;
2844 size_t usr_len, hdr_len;
2845 struct path_it it;
2847 /* Get kvec length */
2848 for (i = 0, usr_len = 0; i < nr; i++)
2849 usr_len += vec[i].iov_len;
2851 if (dir == READ) {
2852 hdr_len = sizeof(struct rtrs_msg_rdma_read) +
2853 sg_cnt * sizeof(struct rtrs_sg_desc);
2854 dma_dir = DMA_FROM_DEVICE;
2855 } else {
2856 hdr_len = sizeof(struct rtrs_msg_rdma_write);
2857 dma_dir = DMA_TO_DEVICE;
2860 rcu_read_lock();
2861 for (path_it_init(&it, clt);
2862 (sess = it.next_path(&it)) && it.i < it.clt->paths_num; it.i++) {
2863 if (unlikely(READ_ONCE(sess->state) != RTRS_CLT_CONNECTED))
2864 continue;
2866 if (unlikely(usr_len + hdr_len > sess->max_hdr_size)) {
2867 rtrs_wrn_rl(sess->clt,
2868 "%s request failed, user message size is %zu and header length %zu, but max size is %u\n",
2869 dir == READ ? "Read" : "Write",
2870 usr_len, hdr_len, sess->max_hdr_size);
2871 err = -EMSGSIZE;
2872 break;
2874 req = rtrs_clt_get_req(sess, ops->conf_fn, permit, ops->priv,
2875 vec, usr_len, sg, sg_cnt, data_len,
2876 dma_dir);
2877 if (dir == READ)
2878 err = rtrs_clt_read_req(req);
2879 else
2880 err = rtrs_clt_write_req(req);
2881 if (unlikely(err)) {
2882 req->in_use = false;
2883 continue;
2885 /* Success path */
2886 break;
2888 path_it_deinit(&it);
2889 rcu_read_unlock();
2891 return err;
2893 EXPORT_SYMBOL(rtrs_clt_request);
2896 * rtrs_clt_query() - queries RTRS session attributes
2897 *@clt: session pointer
2898 *@attr: query results for session attributes.
2899 * Returns:
2900 * 0 on success
2901 * -ECOMM no connection to the server
2903 int rtrs_clt_query(struct rtrs_clt *clt, struct rtrs_attrs *attr)
2905 if (!rtrs_clt_is_connected(clt))
2906 return -ECOMM;
2908 attr->queue_depth = clt->queue_depth;
2909 attr->max_io_size = clt->max_io_size;
2910 attr->sess_kobj = &clt->dev.kobj;
2911 strlcpy(attr->sessname, clt->sessname, sizeof(attr->sessname));
2913 return 0;
2915 EXPORT_SYMBOL(rtrs_clt_query);
2917 int rtrs_clt_create_path_from_sysfs(struct rtrs_clt *clt,
2918 struct rtrs_addr *addr)
2920 struct rtrs_clt_sess *sess;
2921 int err;
2923 sess = alloc_sess(clt, addr, nr_cpu_ids, clt->max_segments,
2924 clt->max_segment_size);
2925 if (IS_ERR(sess))
2926 return PTR_ERR(sess);
2929 * It is totally safe to add path in CONNECTING state: coming
2930 * IO will never grab it. Also it is very important to add
2931 * path before init, since init fires LINK_CONNECTED event.
2933 rtrs_clt_add_path_to_arr(sess);
2935 err = init_sess(sess);
2936 if (err)
2937 goto close_sess;
2939 err = rtrs_clt_create_sess_files(sess);
2940 if (err)
2941 goto close_sess;
2943 return 0;
2945 close_sess:
2946 rtrs_clt_remove_path_from_arr(sess);
2947 rtrs_clt_close_conns(sess, true);
2948 free_sess(sess);
2950 return err;
2953 static int rtrs_clt_ib_dev_init(struct rtrs_ib_dev *dev)
2955 if (!(dev->ib_dev->attrs.device_cap_flags &
2956 IB_DEVICE_MEM_MGT_EXTENSIONS)) {
2957 pr_err("Memory registrations not supported.\n");
2958 return -ENOTSUPP;
2961 return 0;
2964 static const struct rtrs_rdma_dev_pd_ops dev_pd_ops = {
2965 .init = rtrs_clt_ib_dev_init
2968 static int __init rtrs_client_init(void)
2970 rtrs_rdma_dev_pd_init(0, &dev_pd);
2972 rtrs_clt_dev_class = class_create(THIS_MODULE, "rtrs-client");
2973 if (IS_ERR(rtrs_clt_dev_class)) {
2974 pr_err("Failed to create rtrs-client dev class\n");
2975 return PTR_ERR(rtrs_clt_dev_class);
2977 rtrs_wq = alloc_workqueue("rtrs_client_wq", 0, 0);
2978 if (!rtrs_wq) {
2979 class_destroy(rtrs_clt_dev_class);
2980 return -ENOMEM;
2983 return 0;
2986 static void __exit rtrs_client_exit(void)
2988 destroy_workqueue(rtrs_wq);
2989 class_destroy(rtrs_clt_dev_class);
2990 rtrs_rdma_dev_pd_deinit(&dev_pd);
2993 module_init(rtrs_client_init);
2994 module_exit(rtrs_client_exit);