1 // SPDX-License-Identifier: GPL-2.0-or-later
3 * RDMA Network Block Driver
5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
11 #define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
13 #include <linux/module.h>
14 #include <linux/blkdev.h>
15 #include <linux/hdreg.h>
16 #include <linux/scatterlist.h>
17 #include <linux/idr.h>
21 MODULE_DESCRIPTION("RDMA Network Block Device Client");
22 MODULE_LICENSE("GPL");
24 static int rnbd_client_major
;
25 static DEFINE_IDA(index_ida
);
26 static DEFINE_MUTEX(sess_lock
);
27 static LIST_HEAD(sess_list
);
28 static struct workqueue_struct
*rnbd_clt_wq
;
31 * Maximum number of partitions an instance can have.
32 * 6 bits = 64 minors = 63 partitions (one minor is used for the device itself)
34 #define RNBD_PART_BITS 6
36 static inline bool rnbd_clt_get_sess(struct rnbd_clt_session
*sess
)
38 return refcount_inc_not_zero(&sess
->refcount
);
41 static void free_sess(struct rnbd_clt_session
*sess
);
43 static void rnbd_clt_put_sess(struct rnbd_clt_session
*sess
)
47 if (refcount_dec_and_test(&sess
->refcount
))
51 static void rnbd_clt_put_dev(struct rnbd_clt_dev
*dev
)
55 if (!refcount_dec_and_test(&dev
->refcount
))
58 ida_free(&index_ida
, dev
->clt_device_id
);
59 kfree(dev
->hw_queues
);
61 rnbd_clt_put_sess(dev
->sess
);
62 mutex_destroy(&dev
->lock
);
66 static inline bool rnbd_clt_get_dev(struct rnbd_clt_dev
*dev
)
68 return refcount_inc_not_zero(&dev
->refcount
);
71 static void rnbd_clt_change_capacity(struct rnbd_clt_dev
*dev
,
72 sector_t new_nsectors
)
74 if (get_capacity(dev
->gd
) == new_nsectors
)
78 * If the size changed, we need to revalidate it
80 rnbd_clt_info(dev
, "Device size changed from %llu to %llu sectors\n",
81 get_capacity(dev
->gd
), new_nsectors
);
82 set_capacity_and_notify(dev
->gd
, new_nsectors
);
85 static int process_msg_open_rsp(struct rnbd_clt_dev
*dev
,
86 struct rnbd_msg_open_rsp
*rsp
)
88 struct kobject
*gd_kobj
;
91 mutex_lock(&dev
->lock
);
92 if (dev
->dev_state
== DEV_STATE_UNMAPPED
) {
94 "Ignoring Open-Response message from server for unmapped device\n");
98 if (dev
->dev_state
== DEV_STATE_MAPPED_DISCONNECTED
) {
99 u64 nsectors
= le64_to_cpu(rsp
->nsectors
);
101 rnbd_clt_change_capacity(dev
, nsectors
);
102 gd_kobj
= &disk_to_dev(dev
->gd
)->kobj
;
103 kobject_uevent(gd_kobj
, KOBJ_ONLINE
);
104 rnbd_clt_info(dev
, "Device online, device remapped successfully\n");
106 if (!rsp
->logical_block_size
) {
110 dev
->device_id
= le32_to_cpu(rsp
->device_id
);
111 dev
->dev_state
= DEV_STATE_MAPPED
;
114 mutex_unlock(&dev
->lock
);
119 int rnbd_clt_resize_disk(struct rnbd_clt_dev
*dev
, sector_t newsize
)
123 mutex_lock(&dev
->lock
);
124 if (dev
->dev_state
!= DEV_STATE_MAPPED
) {
125 pr_err("Failed to set new size of the device, device is not opened\n");
129 rnbd_clt_change_capacity(dev
, newsize
);
132 mutex_unlock(&dev
->lock
);
137 static inline void rnbd_clt_dev_requeue(struct rnbd_queue
*q
)
139 if (WARN_ON(!q
->hctx
))
142 /* We can come here from interrupt, thus async=true */
143 blk_mq_run_hw_queue(q
->hctx
, true);
147 RNBD_DELAY_IFBUSY
= -1,
151 * rnbd_get_cpu_qlist() - finds a list with HW queues to be rerun
152 * @sess: Session to find a queue for
153 * @cpu: Cpu to start the search from
156 * Each CPU has a list of HW queues, which needs to be rerun. If a list
157 * is not empty - it is marked with a bit. This function finds first
158 * set bit in a bitmap and returns corresponding CPU list.
160 static struct rnbd_cpu_qlist
*
161 rnbd_get_cpu_qlist(struct rnbd_clt_session
*sess
, int cpu
)
165 /* Search from cpu to nr_cpu_ids */
166 bit
= find_next_bit(sess
->cpu_queues_bm
, nr_cpu_ids
, cpu
);
167 if (bit
< nr_cpu_ids
) {
168 return per_cpu_ptr(sess
->cpu_queues
, bit
);
169 } else if (cpu
!= 0) {
170 /* Search from 0 to cpu */
171 bit
= find_first_bit(sess
->cpu_queues_bm
, cpu
);
173 return per_cpu_ptr(sess
->cpu_queues
, bit
);
179 static inline int nxt_cpu(int cpu
)
181 return (cpu
+ 1) % nr_cpu_ids
;
185 * rnbd_rerun_if_needed() - rerun next queue marked as stopped
186 * @sess: Session to rerun a queue on
189 * Each CPU has it's own list of HW queues, which should be rerun.
190 * Function finds such list with HW queues, takes a list lock, picks up
191 * the first HW queue out of the list and requeues it.
194 * True if the queue was requeued, false otherwise.
199 static bool rnbd_rerun_if_needed(struct rnbd_clt_session
*sess
)
201 struct rnbd_queue
*q
= NULL
;
202 struct rnbd_cpu_qlist
*cpu_q
;
207 * To keep fairness and not to let other queues starve we always
208 * try to wake up someone else in round-robin manner. That of course
209 * increases latency but queues always have a chance to be executed.
211 cpup
= get_cpu_ptr(sess
->cpu_rr
);
212 for (cpu_q
= rnbd_get_cpu_qlist(sess
, nxt_cpu(*cpup
)); cpu_q
;
213 cpu_q
= rnbd_get_cpu_qlist(sess
, nxt_cpu(cpu_q
->cpu
))) {
214 if (!spin_trylock_irqsave(&cpu_q
->requeue_lock
, flags
))
216 if (!test_bit(cpu_q
->cpu
, sess
->cpu_queues_bm
))
218 q
= list_first_entry_or_null(&cpu_q
->requeue_list
,
219 typeof(*q
), requeue_list
);
222 list_del_init(&q
->requeue_list
);
223 clear_bit_unlock(0, &q
->in_list
);
225 if (list_empty(&cpu_q
->requeue_list
)) {
226 /* Clear bit if nothing is left */
228 clear_bit(cpu_q
->cpu
, sess
->cpu_queues_bm
);
231 spin_unlock_irqrestore(&cpu_q
->requeue_lock
, flags
);
238 * Saves the CPU that is going to be requeued on the per-cpu var. Just
239 * incrementing it doesn't work because rnbd_get_cpu_qlist() will
240 * always return the first CPU with something on the queue list when the
241 * value stored on the var is greater than the last CPU with something
246 put_cpu_ptr(sess
->cpu_rr
);
249 rnbd_clt_dev_requeue(q
);
255 * rnbd_rerun_all_if_idle() - rerun all queues left in the list if
256 * session is idling (there are no requests
258 * @sess: Session to rerun the queues on
261 * This function tries to rerun all stopped queues if there are no
262 * requests in-flight anymore. This function tries to solve an obvious
263 * problem, when number of tags < than number of queues (hctx), which
264 * are stopped and put to sleep. If last permit, which has been just put,
265 * does not wake up all left queues (hctxs), IO requests hang forever.
267 * That can happen when all number of permits, say N, have been exhausted
268 * from one CPU, and we have many block devices per session, say M.
269 * Each block device has it's own queue (hctx) for each CPU, so eventually
270 * we can put that number of queues (hctxs) to sleep: M x nr_cpu_ids.
271 * If number of permits N < M x nr_cpu_ids finally we will get an IO hang.
273 * To avoid this hang last caller of rnbd_put_permit() (last caller is the
274 * one who observes sess->busy == 0) must wake up all remaining queues.
279 static void rnbd_rerun_all_if_idle(struct rnbd_clt_session
*sess
)
284 requeued
= rnbd_rerun_if_needed(sess
);
285 } while (atomic_read(&sess
->busy
) == 0 && requeued
);
288 static struct rtrs_permit
*rnbd_get_permit(struct rnbd_clt_session
*sess
,
289 enum rtrs_clt_con_type con_type
,
292 struct rtrs_permit
*permit
;
294 permit
= rtrs_clt_get_permit(sess
->rtrs
, con_type
, wait
);
296 /* We have a subtle rare case here, when all permits can be
297 * consumed before busy counter increased. This is safe,
298 * because loser will get NULL as a permit, observe 0 busy
299 * counter and immediately restart the queue himself.
301 atomic_inc(&sess
->busy
);
306 static void rnbd_put_permit(struct rnbd_clt_session
*sess
,
307 struct rtrs_permit
*permit
)
309 rtrs_clt_put_permit(sess
->rtrs
, permit
);
310 atomic_dec(&sess
->busy
);
311 /* Paired with rnbd_clt_dev_add_to_requeue(). Decrement first
312 * and then check queue bits.
314 smp_mb__after_atomic();
315 rnbd_rerun_all_if_idle(sess
);
318 static struct rnbd_iu
*rnbd_get_iu(struct rnbd_clt_session
*sess
,
319 enum rtrs_clt_con_type con_type
,
323 struct rtrs_permit
*permit
;
325 iu
= kzalloc(sizeof(*iu
), GFP_KERNEL
);
329 permit
= rnbd_get_permit(sess
, con_type
, wait
);
337 * 1st reference is dropped after finishing sending a "user" message,
338 * 2nd reference is dropped after confirmation with the response is
340 * 1st and 2nd can happen in any order, so the rnbd_iu should be
341 * released (rtrs_permit returned to rtrs) only after both
344 atomic_set(&iu
->refcount
, 2);
345 init_waitqueue_head(&iu
->comp
.wait
);
346 iu
->comp
.errno
= INT_MAX
;
348 if (sg_alloc_table(&iu
->sgt
, 1, GFP_KERNEL
)) {
349 rnbd_put_permit(sess
, permit
);
357 static void rnbd_put_iu(struct rnbd_clt_session
*sess
, struct rnbd_iu
*iu
)
359 if (atomic_dec_and_test(&iu
->refcount
)) {
360 sg_free_table(&iu
->sgt
);
361 rnbd_put_permit(sess
, iu
->permit
);
366 static void rnbd_softirq_done_fn(struct request
*rq
)
368 struct rnbd_clt_dev
*dev
= rq
->q
->disk
->private_data
;
369 struct rnbd_clt_session
*sess
= dev
->sess
;
372 iu
= blk_mq_rq_to_pdu(rq
);
373 sg_free_table_chained(&iu
->sgt
, RNBD_INLINE_SG_CNT
);
374 rnbd_put_permit(sess
, iu
->permit
);
375 blk_mq_end_request(rq
, errno_to_blk_status(iu
->errno
));
378 static void msg_io_conf(void *priv
, int errno
)
380 struct rnbd_iu
*iu
= priv
;
381 struct rnbd_clt_dev
*dev
= iu
->dev
;
382 struct request
*rq
= iu
->rq
;
383 int rw
= rq_data_dir(rq
);
387 blk_mq_complete_request(rq
);
390 rnbd_clt_info_rl(dev
, "%s I/O failed with err: %d\n",
391 rw
== READ
? "read" : "write", errno
);
394 static void wake_up_iu_comp(struct rnbd_iu
*iu
, int errno
)
396 iu
->comp
.errno
= errno
;
397 wake_up(&iu
->comp
.wait
);
400 static void msg_conf(void *priv
, int errno
)
402 struct rnbd_iu
*iu
= priv
;
405 schedule_work(&iu
->work
);
408 static int send_usr_msg(struct rtrs_clt_sess
*rtrs
, int dir
,
409 struct rnbd_iu
*iu
, struct kvec
*vec
,
410 size_t len
, struct scatterlist
*sg
, unsigned int sg_len
,
411 void (*conf
)(struct work_struct
*work
),
412 int *errno
, int wait
)
415 struct rtrs_clt_req_ops req_ops
;
417 INIT_WORK(&iu
->work
, conf
);
418 req_ops
= (struct rtrs_clt_req_ops
) {
422 err
= rtrs_clt_request(dir
, &req_ops
, rtrs
, iu
->permit
,
423 vec
, 1, len
, sg
, sg_len
);
425 wait_event(iu
->comp
.wait
, iu
->comp
.errno
!= INT_MAX
);
426 *errno
= iu
->comp
.errno
;
434 static void msg_close_conf(struct work_struct
*work
)
436 struct rnbd_iu
*iu
= container_of(work
, struct rnbd_iu
, work
);
437 struct rnbd_clt_dev
*dev
= iu
->dev
;
439 wake_up_iu_comp(iu
, iu
->errno
);
440 rnbd_put_iu(dev
->sess
, iu
);
441 rnbd_clt_put_dev(dev
);
444 static int send_msg_close(struct rnbd_clt_dev
*dev
, u32 device_id
,
447 struct rnbd_clt_session
*sess
= dev
->sess
;
448 struct rnbd_msg_close msg
;
452 .iov_len
= sizeof(msg
)
456 iu
= rnbd_get_iu(sess
, RTRS_ADMIN_CON
, RTRS_PERMIT_WAIT
);
463 msg
.hdr
.type
= cpu_to_le16(RNBD_MSG_CLOSE
);
464 msg
.device_id
= cpu_to_le32(device_id
);
466 WARN_ON(!rnbd_clt_get_dev(dev
));
467 err
= send_usr_msg(sess
->rtrs
, WRITE
, iu
, &vec
, 0, NULL
, 0,
468 msg_close_conf
, &errno
, wait
);
470 rnbd_clt_put_dev(dev
);
471 rnbd_put_iu(sess
, iu
);
476 rnbd_put_iu(sess
, iu
);
480 static void msg_open_conf(struct work_struct
*work
)
482 struct rnbd_iu
*iu
= container_of(work
, struct rnbd_iu
, work
);
483 struct rnbd_msg_open_rsp
*rsp
= iu
->buf
;
484 struct rnbd_clt_dev
*dev
= iu
->dev
;
485 int errno
= iu
->errno
;
486 bool from_map
= false;
488 /* INIT state is only triggered from rnbd_clt_map_device */
489 if (dev
->dev_state
== DEV_STATE_INIT
)
494 "Opening failed, server responded: %d\n",
497 errno
= process_msg_open_rsp(dev
, rsp
);
499 u32 device_id
= le32_to_cpu(rsp
->device_id
);
501 * If server thinks its fine, but we fail to process
502 * then be nice and send a close to server.
504 send_msg_close(dev
, device_id
, RTRS_PERMIT_NOWAIT
);
507 /* We free rsp in rnbd_clt_map_device for map scenario */
510 wake_up_iu_comp(iu
, errno
);
511 rnbd_put_iu(dev
->sess
, iu
);
512 rnbd_clt_put_dev(dev
);
515 static void msg_sess_info_conf(struct work_struct
*work
)
517 struct rnbd_iu
*iu
= container_of(work
, struct rnbd_iu
, work
);
518 struct rnbd_msg_sess_info_rsp
*rsp
= iu
->buf
;
519 struct rnbd_clt_session
*sess
= iu
->sess
;
522 sess
->ver
= min_t(u8
, rsp
->ver
, RNBD_PROTO_VER_MAJOR
);
525 wake_up_iu_comp(iu
, iu
->errno
);
526 rnbd_put_iu(sess
, iu
);
527 rnbd_clt_put_sess(sess
);
530 static int send_msg_open(struct rnbd_clt_dev
*dev
, enum wait_type wait
)
532 struct rnbd_clt_session
*sess
= dev
->sess
;
533 struct rnbd_msg_open_rsp
*rsp
;
534 struct rnbd_msg_open msg
;
538 .iov_len
= sizeof(msg
)
542 rsp
= kzalloc(sizeof(*rsp
), GFP_KERNEL
);
546 iu
= rnbd_get_iu(sess
, RTRS_ADMIN_CON
, RTRS_PERMIT_WAIT
);
555 sg_init_one(iu
->sgt
.sgl
, rsp
, sizeof(*rsp
));
557 msg
.hdr
.type
= cpu_to_le16(RNBD_MSG_OPEN
);
558 msg
.access_mode
= dev
->access_mode
;
559 strscpy(msg
.dev_name
, dev
->pathname
, sizeof(msg
.dev_name
));
561 WARN_ON(!rnbd_clt_get_dev(dev
));
562 err
= send_usr_msg(sess
->rtrs
, READ
, iu
,
563 &vec
, sizeof(*rsp
), iu
->sgt
.sgl
, 1,
564 msg_open_conf
, &errno
, wait
);
566 rnbd_clt_put_dev(dev
);
567 rnbd_put_iu(sess
, iu
);
573 rnbd_put_iu(sess
, iu
);
577 static int send_msg_sess_info(struct rnbd_clt_session
*sess
, enum wait_type wait
)
579 struct rnbd_msg_sess_info_rsp
*rsp
;
580 struct rnbd_msg_sess_info msg
;
584 .iov_len
= sizeof(msg
)
588 rsp
= kzalloc(sizeof(*rsp
), GFP_KERNEL
);
592 iu
= rnbd_get_iu(sess
, RTRS_ADMIN_CON
, RTRS_PERMIT_WAIT
);
600 sg_init_one(iu
->sgt
.sgl
, rsp
, sizeof(*rsp
));
602 msg
.hdr
.type
= cpu_to_le16(RNBD_MSG_SESS_INFO
);
603 msg
.ver
= RNBD_PROTO_VER_MAJOR
;
605 if (!rnbd_clt_get_sess(sess
)) {
607 * That can happen only in one case, when RTRS has restablished
608 * the connection and link_ev() is called, but session is almost
609 * dead, last reference on session is put and caller is waiting
610 * for RTRS to close everything.
615 err
= send_usr_msg(sess
->rtrs
, READ
, iu
,
616 &vec
, sizeof(*rsp
), iu
->sgt
.sgl
, 1,
617 msg_sess_info_conf
, &errno
, wait
);
619 rnbd_clt_put_sess(sess
);
621 rnbd_put_iu(sess
, iu
);
626 rnbd_put_iu(sess
, iu
);
630 static void set_dev_states_to_disconnected(struct rnbd_clt_session
*sess
)
632 struct rnbd_clt_dev
*dev
;
633 struct kobject
*gd_kobj
;
635 mutex_lock(&sess
->lock
);
636 list_for_each_entry(dev
, &sess
->devs_list
, list
) {
637 rnbd_clt_err(dev
, "Device disconnected.\n");
639 mutex_lock(&dev
->lock
);
640 if (dev
->dev_state
== DEV_STATE_MAPPED
) {
641 dev
->dev_state
= DEV_STATE_MAPPED_DISCONNECTED
;
642 gd_kobj
= &disk_to_dev(dev
->gd
)->kobj
;
643 kobject_uevent(gd_kobj
, KOBJ_OFFLINE
);
645 mutex_unlock(&dev
->lock
);
647 mutex_unlock(&sess
->lock
);
650 static void remap_devs(struct rnbd_clt_session
*sess
)
652 struct rnbd_clt_dev
*dev
;
653 struct rtrs_attrs attrs
;
657 * Careful here: we are called from RTRS link event directly,
658 * thus we can't send any RTRS request and wait for response
659 * or RTRS will not be able to complete request with failure
660 * if something goes wrong (failing of outstanding requests
661 * happens exactly from the context where we are blocking now).
663 * So to avoid deadlocks each usr message sent from here must
667 err
= send_msg_sess_info(sess
, RTRS_PERMIT_NOWAIT
);
669 pr_err("send_msg_sess_info(\"%s\"): %d\n", sess
->sessname
, err
);
673 err
= rtrs_clt_query(sess
->rtrs
, &attrs
);
675 pr_err("rtrs_clt_query(\"%s\"): %d\n", sess
->sessname
, err
);
678 mutex_lock(&sess
->lock
);
679 sess
->max_io_size
= attrs
.max_io_size
;
681 list_for_each_entry(dev
, &sess
->devs_list
, list
) {
684 mutex_lock(&dev
->lock
);
685 skip
= (dev
->dev_state
== DEV_STATE_INIT
);
686 mutex_unlock(&dev
->lock
);
689 * When device is establishing connection for the first
690 * time - do not remap, it will be closed soon.
694 rnbd_clt_info(dev
, "session reconnected, remapping device\n");
695 err
= send_msg_open(dev
, RTRS_PERMIT_NOWAIT
);
697 rnbd_clt_err(dev
, "send_msg_open(): %d\n", err
);
701 mutex_unlock(&sess
->lock
);
704 static void rnbd_clt_link_ev(void *priv
, enum rtrs_clt_link_ev ev
)
706 struct rnbd_clt_session
*sess
= priv
;
709 case RTRS_CLT_LINK_EV_DISCONNECTED
:
710 set_dev_states_to_disconnected(sess
);
712 case RTRS_CLT_LINK_EV_RECONNECTED
:
716 pr_err("Unknown session event received (%d), session: %s\n",
721 static void rnbd_init_cpu_qlists(struct rnbd_cpu_qlist __percpu
*cpu_queues
)
724 struct rnbd_cpu_qlist
*cpu_q
;
726 for_each_possible_cpu(cpu
) {
727 cpu_q
= per_cpu_ptr(cpu_queues
, cpu
);
730 INIT_LIST_HEAD(&cpu_q
->requeue_list
);
731 spin_lock_init(&cpu_q
->requeue_lock
);
735 static void destroy_mq_tags(struct rnbd_clt_session
*sess
)
737 if (sess
->tag_set
.tags
)
738 blk_mq_free_tag_set(&sess
->tag_set
);
741 static inline void wake_up_rtrs_waiters(struct rnbd_clt_session
*sess
)
743 sess
->rtrs_ready
= true;
744 wake_up_all(&sess
->rtrs_waitq
);
747 static void close_rtrs(struct rnbd_clt_session
*sess
)
751 if (!IS_ERR_OR_NULL(sess
->rtrs
)) {
752 rtrs_clt_close(sess
->rtrs
);
754 wake_up_rtrs_waiters(sess
);
758 static void free_sess(struct rnbd_clt_session
*sess
)
760 WARN_ON(!list_empty(&sess
->devs_list
));
765 destroy_mq_tags(sess
);
766 if (!list_empty(&sess
->list
)) {
767 mutex_lock(&sess_lock
);
768 list_del(&sess
->list
);
769 mutex_unlock(&sess_lock
);
771 free_percpu(sess
->cpu_queues
);
772 free_percpu(sess
->cpu_rr
);
773 mutex_destroy(&sess
->lock
);
777 static struct rnbd_clt_session
*alloc_sess(const char *sessname
)
779 struct rnbd_clt_session
*sess
;
782 sess
= kzalloc_node(sizeof(*sess
), GFP_KERNEL
, NUMA_NO_NODE
);
784 return ERR_PTR(-ENOMEM
);
785 strscpy(sess
->sessname
, sessname
, sizeof(sess
->sessname
));
786 atomic_set(&sess
->busy
, 0);
787 mutex_init(&sess
->lock
);
788 INIT_LIST_HEAD(&sess
->devs_list
);
789 INIT_LIST_HEAD(&sess
->list
);
790 bitmap_zero(sess
->cpu_queues_bm
, num_possible_cpus());
791 init_waitqueue_head(&sess
->rtrs_waitq
);
792 refcount_set(&sess
->refcount
, 1);
794 sess
->cpu_queues
= alloc_percpu(struct rnbd_cpu_qlist
);
795 if (!sess
->cpu_queues
) {
799 rnbd_init_cpu_qlists(sess
->cpu_queues
);
802 * That is simple percpu variable which stores cpu indices, which are
803 * incremented on each access. We need that for the sake of fairness
804 * to wake up queues in a round-robin manner.
806 sess
->cpu_rr
= alloc_percpu(int);
811 for_each_possible_cpu(cpu
)
812 * per_cpu_ptr(sess
->cpu_rr
, cpu
) = cpu
;
822 static int wait_for_rtrs_connection(struct rnbd_clt_session
*sess
)
824 wait_event(sess
->rtrs_waitq
, sess
->rtrs_ready
);
825 if (IS_ERR_OR_NULL(sess
->rtrs
))
831 static void wait_for_rtrs_disconnection(struct rnbd_clt_session
*sess
)
832 __releases(&sess_lock
)
833 __acquires(&sess_lock
)
837 prepare_to_wait(&sess
->rtrs_waitq
, &wait
, TASK_UNINTERRUPTIBLE
);
838 if (IS_ERR_OR_NULL(sess
->rtrs
)) {
839 finish_wait(&sess
->rtrs_waitq
, &wait
);
842 mutex_unlock(&sess_lock
);
843 /* loop in caller, see __find_and_get_sess().
844 * You can't leave mutex locked and call schedule(), you will catch a
845 * deadlock with a caller of free_sess(), which has just put the last
846 * reference and is about to take the sess_lock in order to delete
847 * the session from the list.
850 mutex_lock(&sess_lock
);
853 static struct rnbd_clt_session
*__find_and_get_sess(const char *sessname
)
854 __releases(&sess_lock
)
855 __acquires(&sess_lock
)
857 struct rnbd_clt_session
*sess
, *sn
;
861 list_for_each_entry_safe(sess
, sn
, &sess_list
, list
) {
862 if (strcmp(sessname
, sess
->sessname
))
865 if (sess
->rtrs_ready
&& IS_ERR_OR_NULL(sess
->rtrs
))
867 * No RTRS connection, session is dying.
871 if (rnbd_clt_get_sess(sess
)) {
873 * Alive session is found, wait for RTRS connection.
875 mutex_unlock(&sess_lock
);
876 err
= wait_for_rtrs_connection(sess
);
878 rnbd_clt_put_sess(sess
);
879 mutex_lock(&sess_lock
);
882 /* Session is dying, repeat the loop */
888 * Ref is 0, session is dying, wait for RTRS disconnect
889 * in order to avoid session names clashes.
891 wait_for_rtrs_disconnection(sess
);
893 * RTRS is disconnected and soon session will be freed,
902 /* caller is responsible for initializing 'first' to false */
904 rnbd_clt_session
*find_or_create_sess(const char *sessname
, bool *first
)
906 struct rnbd_clt_session
*sess
= NULL
;
908 mutex_lock(&sess_lock
);
909 sess
= __find_and_get_sess(sessname
);
911 sess
= alloc_sess(sessname
);
913 mutex_unlock(&sess_lock
);
916 list_add(&sess
->list
, &sess_list
);
919 mutex_unlock(&sess_lock
);
924 static int rnbd_client_open(struct gendisk
*disk
, blk_mode_t mode
)
926 struct rnbd_clt_dev
*dev
= disk
->private_data
;
928 if (get_disk_ro(dev
->gd
) && (mode
& BLK_OPEN_WRITE
))
931 if (dev
->dev_state
== DEV_STATE_UNMAPPED
||
932 !rnbd_clt_get_dev(dev
))
938 static void rnbd_client_release(struct gendisk
*gen
)
940 struct rnbd_clt_dev
*dev
= gen
->private_data
;
942 rnbd_clt_put_dev(dev
);
945 static int rnbd_client_getgeo(struct block_device
*block_device
,
946 struct hd_geometry
*geo
)
949 struct rnbd_clt_dev
*dev
= block_device
->bd_disk
->private_data
;
950 struct queue_limits
*limit
= &dev
->queue
->limits
;
952 size
= dev
->size
* (limit
->logical_block_size
/ SECTOR_SIZE
);
953 geo
->cylinders
= size
>> 6; /* size/64 */
961 static const struct block_device_operations rnbd_client_ops
= {
962 .owner
= THIS_MODULE
,
963 .open
= rnbd_client_open
,
964 .release
= rnbd_client_release
,
965 .getgeo
= rnbd_client_getgeo
968 /* The amount of data that belongs to an I/O and the amount of data that
969 * should be read or written to the disk (bi_size) can differ.
971 * E.g. When WRITE_SAME is used, only a small amount of data is
972 * transferred that is then written repeatedly over a lot of sectors.
974 * Get the size of data to be transferred via RTRS by summing up the size
975 * of the scather-gather list entries.
977 static size_t rnbd_clt_get_sg_size(struct scatterlist
*sglist
, u32 len
)
979 struct scatterlist
*sg
;
983 for_each_sg(sglist
, sg
, len
, i
)
988 static int rnbd_client_xfer_request(struct rnbd_clt_dev
*dev
,
992 struct rtrs_clt_sess
*rtrs
= dev
->sess
->rtrs
;
993 struct rtrs_permit
*permit
= iu
->permit
;
994 struct rnbd_msg_io msg
;
995 struct rtrs_clt_req_ops req_ops
;
996 unsigned int sg_cnt
= 0;
1003 msg
.sector
= cpu_to_le64(blk_rq_pos(rq
));
1004 msg
.bi_size
= cpu_to_le32(blk_rq_bytes(rq
));
1005 msg
.rw
= cpu_to_le32(rq_to_rnbd_flags(rq
));
1006 msg
.prio
= cpu_to_le16(req_get_ioprio(rq
));
1009 * We only support discards/WRITE_ZEROES with single segment for now.
1012 if ((req_op(rq
) != REQ_OP_DISCARD
) && (req_op(rq
) != REQ_OP_WRITE_ZEROES
))
1013 sg_cnt
= blk_rq_map_sg(dev
->queue
, rq
, iu
->sgt
.sgl
);
1016 sg_mark_end(&iu
->sgt
.sgl
[0]);
1018 msg
.hdr
.type
= cpu_to_le16(RNBD_MSG_IO
);
1019 msg
.device_id
= cpu_to_le32(dev
->device_id
);
1021 vec
= (struct kvec
) {
1023 .iov_len
= sizeof(msg
)
1025 size
= rnbd_clt_get_sg_size(iu
->sgt
.sgl
, sg_cnt
);
1026 req_ops
= (struct rtrs_clt_req_ops
) {
1028 .conf_fn
= msg_io_conf
,
1030 err
= rtrs_clt_request(rq_data_dir(rq
), &req_ops
, rtrs
, permit
,
1031 &vec
, 1, size
, iu
->sgt
.sgl
, sg_cnt
);
1033 rnbd_clt_err_rl(dev
, "RTRS failed to transfer IO, err: %d\n",
1042 * rnbd_clt_dev_add_to_requeue() - add device to requeue if session is busy
1043 * @dev: Device to be checked
1044 * @q: Queue to be added to the requeue list if required
1047 * If session is busy, that means someone will requeue us when resources
1048 * are freed. If session is not doing anything - device is not added to
1049 * the list and @false is returned.
1051 static bool rnbd_clt_dev_add_to_requeue(struct rnbd_clt_dev
*dev
,
1052 struct rnbd_queue
*q
)
1054 struct rnbd_clt_session
*sess
= dev
->sess
;
1055 struct rnbd_cpu_qlist
*cpu_q
;
1056 unsigned long flags
;
1060 cpu_q
= get_cpu_ptr(sess
->cpu_queues
);
1061 spin_lock_irqsave(&cpu_q
->requeue_lock
, flags
);
1063 if (!test_and_set_bit_lock(0, &q
->in_list
)) {
1064 if (WARN_ON(!list_empty(&q
->requeue_list
)))
1067 need_set
= !test_bit(cpu_q
->cpu
, sess
->cpu_queues_bm
);
1069 set_bit(cpu_q
->cpu
, sess
->cpu_queues_bm
);
1070 /* Paired with rnbd_put_permit(). Set a bit first
1071 * and then observe the busy counter.
1073 smp_mb__before_atomic();
1075 if (atomic_read(&sess
->busy
)) {
1076 list_add_tail(&q
->requeue_list
, &cpu_q
->requeue_list
);
1078 /* Very unlikely, but possible: busy counter was
1079 * observed as zero. Drop all bits and return
1080 * false to restart the queue by ourselves.
1083 clear_bit(cpu_q
->cpu
, sess
->cpu_queues_bm
);
1084 clear_bit_unlock(0, &q
->in_list
);
1089 spin_unlock_irqrestore(&cpu_q
->requeue_lock
, flags
);
1090 put_cpu_ptr(sess
->cpu_queues
);
1095 static void rnbd_clt_dev_kick_mq_queue(struct rnbd_clt_dev
*dev
,
1096 struct blk_mq_hw_ctx
*hctx
,
1099 struct rnbd_queue
*q
= hctx
->driver_data
;
1101 if (delay
!= RNBD_DELAY_IFBUSY
)
1102 blk_mq_delay_run_hw_queue(hctx
, delay
);
1103 else if (!rnbd_clt_dev_add_to_requeue(dev
, q
))
1105 * If session is not busy we have to restart
1106 * the queue ourselves.
1108 blk_mq_delay_run_hw_queue(hctx
, 10/*ms*/);
1111 static blk_status_t
rnbd_queue_rq(struct blk_mq_hw_ctx
*hctx
,
1112 const struct blk_mq_queue_data
*bd
)
1114 struct request
*rq
= bd
->rq
;
1115 struct rnbd_clt_dev
*dev
= rq
->q
->disk
->private_data
;
1116 struct rnbd_iu
*iu
= blk_mq_rq_to_pdu(rq
);
1118 blk_status_t ret
= BLK_STS_IOERR
;
1120 if (dev
->dev_state
!= DEV_STATE_MAPPED
)
1121 return BLK_STS_IOERR
;
1123 iu
->permit
= rnbd_get_permit(dev
->sess
, RTRS_IO_CON
,
1124 RTRS_PERMIT_NOWAIT
);
1126 rnbd_clt_dev_kick_mq_queue(dev
, hctx
, RNBD_DELAY_IFBUSY
);
1127 return BLK_STS_RESOURCE
;
1130 iu
->sgt
.sgl
= iu
->first_sgl
;
1131 err
= sg_alloc_table_chained(&iu
->sgt
,
1132 /* Even-if the request has no segment,
1133 * sglist must have one entry at least.
1135 blk_rq_nr_phys_segments(rq
) ? : 1,
1137 RNBD_INLINE_SG_CNT
);
1139 rnbd_clt_err_rl(dev
, "sg_alloc_table_chained ret=%d\n", err
);
1140 rnbd_clt_dev_kick_mq_queue(dev
, hctx
, 10/*ms*/);
1141 rnbd_put_permit(dev
->sess
, iu
->permit
);
1142 return BLK_STS_RESOURCE
;
1145 blk_mq_start_request(rq
);
1146 err
= rnbd_client_xfer_request(dev
, rq
, iu
);
1149 if (err
== -EAGAIN
|| err
== -ENOMEM
) {
1150 rnbd_clt_dev_kick_mq_queue(dev
, hctx
, 10/*ms*/);
1151 ret
= BLK_STS_RESOURCE
;
1153 sg_free_table_chained(&iu
->sgt
, RNBD_INLINE_SG_CNT
);
1154 rnbd_put_permit(dev
->sess
, iu
->permit
);
1158 static int rnbd_rdma_poll(struct blk_mq_hw_ctx
*hctx
, struct io_comp_batch
*iob
)
1160 struct rnbd_queue
*q
= hctx
->driver_data
;
1161 struct rnbd_clt_dev
*dev
= q
->dev
;
1163 return rtrs_clt_rdma_cq_direct(dev
->sess
->rtrs
, hctx
->queue_num
);
1166 static void rnbd_rdma_map_queues(struct blk_mq_tag_set
*set
)
1168 struct rnbd_clt_session
*sess
= set
->driver_data
;
1170 /* shared read/write queues */
1171 set
->map
[HCTX_TYPE_DEFAULT
].nr_queues
= num_online_cpus();
1172 set
->map
[HCTX_TYPE_DEFAULT
].queue_offset
= 0;
1173 set
->map
[HCTX_TYPE_READ
].nr_queues
= num_online_cpus();
1174 set
->map
[HCTX_TYPE_READ
].queue_offset
= 0;
1175 blk_mq_map_queues(&set
->map
[HCTX_TYPE_DEFAULT
]);
1176 blk_mq_map_queues(&set
->map
[HCTX_TYPE_READ
]);
1178 if (sess
->nr_poll_queues
) {
1179 /* dedicated queue for poll */
1180 set
->map
[HCTX_TYPE_POLL
].nr_queues
= sess
->nr_poll_queues
;
1181 set
->map
[HCTX_TYPE_POLL
].queue_offset
= set
->map
[HCTX_TYPE_READ
].queue_offset
+
1182 set
->map
[HCTX_TYPE_READ
].nr_queues
;
1183 blk_mq_map_queues(&set
->map
[HCTX_TYPE_POLL
]);
1184 pr_info("[session=%s] mapped %d/%d/%d default/read/poll queues.\n",
1186 set
->map
[HCTX_TYPE_DEFAULT
].nr_queues
,
1187 set
->map
[HCTX_TYPE_READ
].nr_queues
,
1188 set
->map
[HCTX_TYPE_POLL
].nr_queues
);
1190 pr_info("[session=%s] mapped %d/%d default/read queues.\n",
1192 set
->map
[HCTX_TYPE_DEFAULT
].nr_queues
,
1193 set
->map
[HCTX_TYPE_READ
].nr_queues
);
1197 static struct blk_mq_ops rnbd_mq_ops
= {
1198 .queue_rq
= rnbd_queue_rq
,
1199 .complete
= rnbd_softirq_done_fn
,
1200 .map_queues
= rnbd_rdma_map_queues
,
1201 .poll
= rnbd_rdma_poll
,
1204 static int setup_mq_tags(struct rnbd_clt_session
*sess
)
1206 struct blk_mq_tag_set
*tag_set
= &sess
->tag_set
;
1208 memset(tag_set
, 0, sizeof(*tag_set
));
1209 tag_set
->ops
= &rnbd_mq_ops
;
1210 tag_set
->queue_depth
= sess
->queue_depth
;
1211 tag_set
->numa_node
= NUMA_NO_NODE
;
1212 tag_set
->flags
= BLK_MQ_F_SHOULD_MERGE
|
1213 BLK_MQ_F_TAG_QUEUE_SHARED
;
1214 tag_set
->cmd_size
= sizeof(struct rnbd_iu
) + RNBD_RDMA_SGL_SIZE
;
1216 /* for HCTX_TYPE_DEFAULT, HCTX_TYPE_READ, HCTX_TYPE_POLL */
1217 tag_set
->nr_maps
= sess
->nr_poll_queues
? HCTX_MAX_TYPES
: 2;
1219 * HCTX_TYPE_DEFAULT and HCTX_TYPE_READ share one set of queues
1220 * others are for HCTX_TYPE_POLL
1222 tag_set
->nr_hw_queues
= num_online_cpus() + sess
->nr_poll_queues
;
1223 tag_set
->driver_data
= sess
;
1225 return blk_mq_alloc_tag_set(tag_set
);
1228 static struct rnbd_clt_session
*
1229 find_and_get_or_create_sess(const char *sessname
,
1230 const struct rtrs_addr
*paths
,
1231 size_t path_cnt
, u16 port_nr
, u32 nr_poll_queues
)
1233 struct rnbd_clt_session
*sess
;
1234 struct rtrs_attrs attrs
;
1237 struct rtrs_clt_ops rtrs_ops
;
1239 sess
= find_or_create_sess(sessname
, &first
);
1240 if (sess
== ERR_PTR(-ENOMEM
)) {
1241 return ERR_PTR(-ENOMEM
);
1242 } else if ((nr_poll_queues
&& !first
) || (!nr_poll_queues
&& sess
->nr_poll_queues
)) {
1244 * A device MUST have its own session to use the polling-mode.
1245 * It must fail to map new device with the same session.
1255 pr_err("Session %s not found, and path parameter not given", sessname
);
1260 rtrs_ops
= (struct rtrs_clt_ops
) {
1262 .link_ev
= rnbd_clt_link_ev
,
1265 * Nothing was found, establish rtrs connection and proceed further.
1267 sess
->rtrs
= rtrs_clt_open(&rtrs_ops
, sessname
,
1268 paths
, path_cnt
, port_nr
,
1269 0, /* Do not use pdu of rtrs */
1271 MAX_RECONNECTS
, nr_poll_queues
);
1272 if (IS_ERR(sess
->rtrs
)) {
1273 err
= PTR_ERR(sess
->rtrs
);
1274 goto wake_up_and_put
;
1277 err
= rtrs_clt_query(sess
->rtrs
, &attrs
);
1281 sess
->max_io_size
= attrs
.max_io_size
;
1282 sess
->queue_depth
= attrs
.queue_depth
;
1283 sess
->nr_poll_queues
= nr_poll_queues
;
1284 sess
->max_segments
= attrs
.max_segments
;
1286 err
= setup_mq_tags(sess
);
1290 err
= send_msg_sess_info(sess
, RTRS_PERMIT_WAIT
);
1294 wake_up_rtrs_waiters(sess
);
1301 rnbd_clt_put_sess(sess
);
1303 return ERR_PTR(err
);
1306 wake_up_rtrs_waiters(sess
);
1310 static inline void rnbd_init_hw_queue(struct rnbd_clt_dev
*dev
,
1311 struct rnbd_queue
*q
,
1312 struct blk_mq_hw_ctx
*hctx
)
1314 INIT_LIST_HEAD(&q
->requeue_list
);
1319 static void rnbd_init_mq_hw_queues(struct rnbd_clt_dev
*dev
)
1322 struct blk_mq_hw_ctx
*hctx
;
1323 struct rnbd_queue
*q
;
1325 queue_for_each_hw_ctx(dev
->queue
, hctx
, i
) {
1326 q
= &dev
->hw_queues
[i
];
1327 rnbd_init_hw_queue(dev
, q
, hctx
);
1328 hctx
->driver_data
= q
;
1332 static int rnbd_clt_setup_gen_disk(struct rnbd_clt_dev
*dev
,
1333 struct rnbd_msg_open_rsp
*rsp
, int idx
)
1337 dev
->gd
->major
= rnbd_client_major
;
1338 dev
->gd
->first_minor
= idx
<< RNBD_PART_BITS
;
1339 dev
->gd
->minors
= 1 << RNBD_PART_BITS
;
1340 dev
->gd
->fops
= &rnbd_client_ops
;
1341 dev
->gd
->queue
= dev
->queue
;
1342 dev
->gd
->private_data
= dev
;
1343 snprintf(dev
->gd
->disk_name
, sizeof(dev
->gd
->disk_name
), "rnbd%d",
1345 pr_debug("disk_name=%s, capacity=%llu\n",
1347 le64_to_cpu(rsp
->nsectors
) *
1348 (le16_to_cpu(rsp
->logical_block_size
) / SECTOR_SIZE
));
1350 set_capacity(dev
->gd
, le64_to_cpu(rsp
->nsectors
));
1352 if (dev
->access_mode
== RNBD_ACCESS_RO
)
1353 set_disk_ro(dev
->gd
, true);
1355 err
= add_disk(dev
->gd
);
1362 static int rnbd_client_setup_device(struct rnbd_clt_dev
*dev
,
1363 struct rnbd_msg_open_rsp
*rsp
)
1365 struct queue_limits lim
= {
1366 .logical_block_size
= le16_to_cpu(rsp
->logical_block_size
),
1367 .physical_block_size
= le16_to_cpu(rsp
->physical_block_size
),
1368 .io_opt
= dev
->sess
->max_io_size
,
1369 .max_hw_sectors
= dev
->sess
->max_io_size
/ SECTOR_SIZE
,
1370 .max_hw_discard_sectors
= le32_to_cpu(rsp
->max_discard_sectors
),
1371 .discard_granularity
= le32_to_cpu(rsp
->discard_granularity
),
1372 .discard_alignment
= le32_to_cpu(rsp
->discard_alignment
),
1373 .max_segments
= dev
->sess
->max_segments
,
1374 .virt_boundary_mask
= SZ_4K
- 1,
1375 .max_write_zeroes_sectors
=
1376 le32_to_cpu(rsp
->max_write_zeroes_sectors
),
1378 int idx
= dev
->clt_device_id
;
1380 dev
->size
= le64_to_cpu(rsp
->nsectors
) *
1381 le16_to_cpu(rsp
->logical_block_size
);
1383 if (rsp
->secure_discard
) {
1384 lim
.max_secure_erase_sectors
=
1385 le32_to_cpu(rsp
->max_discard_sectors
);
1388 if (rsp
->cache_policy
& RNBD_WRITEBACK
) {
1389 lim
.features
|= BLK_FEAT_WRITE_CACHE
;
1390 if (rsp
->cache_policy
& RNBD_FUA
)
1391 lim
.features
|= BLK_FEAT_FUA
;
1394 dev
->gd
= blk_mq_alloc_disk(&dev
->sess
->tag_set
, &lim
, dev
);
1395 if (IS_ERR(dev
->gd
))
1396 return PTR_ERR(dev
->gd
);
1397 dev
->queue
= dev
->gd
->queue
;
1398 rnbd_init_mq_hw_queues(dev
);
1400 return rnbd_clt_setup_gen_disk(dev
, rsp
, idx
);
1403 static struct rnbd_clt_dev
*init_dev(struct rnbd_clt_session
*sess
,
1404 enum rnbd_access_mode access_mode
,
1405 const char *pathname
,
1408 struct rnbd_clt_dev
*dev
;
1411 dev
= kzalloc_node(sizeof(*dev
), GFP_KERNEL
, NUMA_NO_NODE
);
1413 return ERR_PTR(-ENOMEM
);
1416 * nr_cpu_ids: the number of softirq queues
1417 * nr_poll_queues: the number of polling queues
1419 dev
->hw_queues
= kcalloc(nr_cpu_ids
+ nr_poll_queues
,
1420 sizeof(*dev
->hw_queues
),
1422 if (!dev
->hw_queues
) {
1427 ret
= ida_alloc_max(&index_ida
, (1 << (MINORBITS
- RNBD_PART_BITS
)) - 1,
1430 pr_err("Failed to initialize device '%s' from session %s, allocating idr failed, err: %d\n",
1431 pathname
, sess
->sessname
, ret
);
1435 dev
->pathname
= kstrdup(pathname
, GFP_KERNEL
);
1436 if (!dev
->pathname
) {
1441 dev
->clt_device_id
= ret
;
1443 dev
->access_mode
= access_mode
;
1444 dev
->nr_poll_queues
= nr_poll_queues
;
1445 mutex_init(&dev
->lock
);
1446 refcount_set(&dev
->refcount
, 1);
1447 dev
->dev_state
= DEV_STATE_INIT
;
1450 * Here we called from sysfs entry, thus clt-sysfs is
1451 * responsible that session will not disappear.
1453 WARN_ON(!rnbd_clt_get_sess(sess
));
1458 kfree(dev
->hw_queues
);
1461 return ERR_PTR(ret
);
1464 static bool __exists_dev(const char *pathname
, const char *sessname
)
1466 struct rnbd_clt_session
*sess
;
1467 struct rnbd_clt_dev
*dev
;
1470 list_for_each_entry(sess
, &sess_list
, list
) {
1471 if (sessname
&& strncmp(sess
->sessname
, sessname
,
1472 sizeof(sess
->sessname
)))
1474 mutex_lock(&sess
->lock
);
1475 list_for_each_entry(dev
, &sess
->devs_list
, list
) {
1476 if (strlen(dev
->pathname
) == strlen(pathname
) &&
1477 !strcmp(dev
->pathname
, pathname
)) {
1482 mutex_unlock(&sess
->lock
);
1490 static bool exists_devpath(const char *pathname
, const char *sessname
)
1494 mutex_lock(&sess_lock
);
1495 found
= __exists_dev(pathname
, sessname
);
1496 mutex_unlock(&sess_lock
);
1501 static bool insert_dev_if_not_exists_devpath(struct rnbd_clt_dev
*dev
)
1504 struct rnbd_clt_session
*sess
= dev
->sess
;
1506 mutex_lock(&sess_lock
);
1507 found
= __exists_dev(dev
->pathname
, sess
->sessname
);
1509 mutex_lock(&sess
->lock
);
1510 list_add_tail(&dev
->list
, &sess
->devs_list
);
1511 mutex_unlock(&sess
->lock
);
1513 mutex_unlock(&sess_lock
);
1518 static void delete_dev(struct rnbd_clt_dev
*dev
)
1520 struct rnbd_clt_session
*sess
= dev
->sess
;
1522 mutex_lock(&sess
->lock
);
1523 list_del(&dev
->list
);
1524 mutex_unlock(&sess
->lock
);
1527 struct rnbd_clt_dev
*rnbd_clt_map_device(const char *sessname
,
1528 struct rtrs_addr
*paths
,
1529 size_t path_cnt
, u16 port_nr
,
1530 const char *pathname
,
1531 enum rnbd_access_mode access_mode
,
1534 struct rnbd_clt_session
*sess
;
1535 struct rnbd_clt_dev
*dev
;
1537 struct rnbd_msg_open_rsp
*rsp
;
1538 struct rnbd_msg_open msg
;
1542 .iov_len
= sizeof(msg
)
1545 if (exists_devpath(pathname
, sessname
))
1546 return ERR_PTR(-EEXIST
);
1548 sess
= find_and_get_or_create_sess(sessname
, paths
, path_cnt
, port_nr
, nr_poll_queues
);
1550 return ERR_CAST(sess
);
1552 dev
= init_dev(sess
, access_mode
, pathname
, nr_poll_queues
);
1554 pr_err("map_device: failed to map device '%s' from session %s, can't initialize device, err: %pe\n",
1555 pathname
, sess
->sessname
, dev
);
1559 if (insert_dev_if_not_exists_devpath(dev
)) {
1564 rsp
= kzalloc(sizeof(*rsp
), GFP_KERNEL
);
1570 iu
= rnbd_get_iu(sess
, RTRS_ADMIN_CON
, RTRS_PERMIT_WAIT
);
1578 sg_init_one(iu
->sgt
.sgl
, rsp
, sizeof(*rsp
));
1580 msg
.hdr
.type
= cpu_to_le16(RNBD_MSG_OPEN
);
1581 msg
.access_mode
= dev
->access_mode
;
1582 strscpy(msg
.dev_name
, dev
->pathname
, sizeof(msg
.dev_name
));
1584 WARN_ON(!rnbd_clt_get_dev(dev
));
1585 ret
= send_usr_msg(sess
->rtrs
, READ
, iu
,
1586 &vec
, sizeof(*rsp
), iu
->sgt
.sgl
, 1,
1587 msg_open_conf
, &errno
, RTRS_PERMIT_WAIT
);
1589 rnbd_clt_put_dev(dev
);
1590 rnbd_put_iu(sess
, iu
);
1596 "map_device: failed, can't open remote device, err: %d\n",
1600 mutex_lock(&dev
->lock
);
1601 pr_debug("Opened remote device: session=%s, path='%s'\n",
1602 sess
->sessname
, pathname
);
1603 ret
= rnbd_client_setup_device(dev
, rsp
);
1606 "map_device: Failed to configure device, err: %d\n",
1608 mutex_unlock(&dev
->lock
);
1613 "map_device: Device mapped as %s (nsectors: %llu, logical_block_size: %d, physical_block_size: %d, max_write_zeroes_sectors: %d, max_discard_sectors: %d, discard_granularity: %d, discard_alignment: %d, secure_discard: %d, max_segments: %d, max_hw_sectors: %d, wc: %d, fua: %d)\n",
1614 dev
->gd
->disk_name
, le64_to_cpu(rsp
->nsectors
),
1615 le16_to_cpu(rsp
->logical_block_size
),
1616 le16_to_cpu(rsp
->physical_block_size
),
1617 le32_to_cpu(rsp
->max_write_zeroes_sectors
),
1618 le32_to_cpu(rsp
->max_discard_sectors
),
1619 le32_to_cpu(rsp
->discard_granularity
),
1620 le32_to_cpu(rsp
->discard_alignment
),
1621 le16_to_cpu(rsp
->secure_discard
),
1622 sess
->max_segments
, sess
->max_io_size
/ SECTOR_SIZE
,
1623 !!(rsp
->cache_policy
& RNBD_WRITEBACK
),
1624 !!(rsp
->cache_policy
& RNBD_FUA
));
1626 mutex_unlock(&dev
->lock
);
1628 rnbd_put_iu(sess
, iu
);
1629 rnbd_clt_put_sess(sess
);
1634 send_msg_close(dev
, dev
->device_id
, RTRS_PERMIT_WAIT
);
1637 rnbd_put_iu(sess
, iu
);
1641 rnbd_clt_put_dev(dev
);
1643 rnbd_clt_put_sess(sess
);
1645 return ERR_PTR(ret
);
1648 static void destroy_gen_disk(struct rnbd_clt_dev
*dev
)
1650 del_gendisk(dev
->gd
);
1654 static void destroy_sysfs(struct rnbd_clt_dev
*dev
,
1655 const struct attribute
*sysfs_self
)
1657 rnbd_clt_remove_dev_symlink(dev
);
1658 if (dev
->kobj
.state_initialized
) {
1660 /* To avoid deadlock firstly remove itself */
1661 sysfs_remove_file_self(&dev
->kobj
, sysfs_self
);
1662 kobject_del(&dev
->kobj
);
1663 kobject_put(&dev
->kobj
);
1667 int rnbd_clt_unmap_device(struct rnbd_clt_dev
*dev
, bool force
,
1668 const struct attribute
*sysfs_self
)
1670 struct rnbd_clt_session
*sess
= dev
->sess
;
1671 int refcount
, ret
= 0;
1674 mutex_lock(&dev
->lock
);
1675 if (dev
->dev_state
== DEV_STATE_UNMAPPED
) {
1676 rnbd_clt_info(dev
, "Device is already being unmapped\n");
1680 refcount
= refcount_read(&dev
->refcount
);
1681 if (!force
&& refcount
> 1) {
1683 "Closing device failed, device is in use, (%d device users)\n",
1688 was_mapped
= (dev
->dev_state
== DEV_STATE_MAPPED
);
1689 dev
->dev_state
= DEV_STATE_UNMAPPED
;
1690 mutex_unlock(&dev
->lock
);
1693 destroy_sysfs(dev
, sysfs_self
);
1694 destroy_gen_disk(dev
);
1695 if (was_mapped
&& sess
->rtrs
)
1696 send_msg_close(dev
, dev
->device_id
, RTRS_PERMIT_WAIT
);
1698 rnbd_clt_info(dev
, "Device is unmapped\n");
1700 /* Likely last reference put */
1701 rnbd_clt_put_dev(dev
);
1704 * Here device and session can be vanished!
1709 mutex_unlock(&dev
->lock
);
1714 int rnbd_clt_remap_device(struct rnbd_clt_dev
*dev
)
1718 mutex_lock(&dev
->lock
);
1719 if (dev
->dev_state
== DEV_STATE_MAPPED_DISCONNECTED
)
1721 else if (dev
->dev_state
== DEV_STATE_UNMAPPED
)
1723 else if (dev
->dev_state
== DEV_STATE_MAPPED
)
1727 mutex_unlock(&dev
->lock
);
1729 rnbd_clt_info(dev
, "Remapping device.\n");
1730 err
= send_msg_open(dev
, RTRS_PERMIT_WAIT
);
1732 rnbd_clt_err(dev
, "remap_device: %d\n", err
);
1738 static void unmap_device_work(struct work_struct
*work
)
1740 struct rnbd_clt_dev
*dev
;
1742 dev
= container_of(work
, typeof(*dev
), unmap_on_rmmod_work
);
1743 rnbd_clt_unmap_device(dev
, true, NULL
);
1746 static void rnbd_destroy_sessions(void)
1748 struct rnbd_clt_session
*sess
, *sn
;
1749 struct rnbd_clt_dev
*dev
, *tn
;
1751 /* Firstly forbid access through sysfs interface */
1752 rnbd_clt_destroy_sysfs_files();
1755 * Here at this point there is no any concurrent access to sessions
1756 * list and devices list:
1757 * 1. New session or device can't be created - session sysfs files
1759 * 2. Device or session can't be removed - module reference is taken
1760 * into account in unmap device sysfs callback.
1761 * 3. No IO requests inflight - each file open of block_dev increases
1762 * module reference in get_disk().
1764 * But still there can be user requests inflights, which are sent by
1765 * asynchronous send_msg_*() functions, thus before unmapping devices
1766 * RTRS session must be explicitly closed.
1769 list_for_each_entry_safe(sess
, sn
, &sess_list
, list
) {
1770 if (!rnbd_clt_get_sess(sess
))
1773 list_for_each_entry_safe(dev
, tn
, &sess
->devs_list
, list
) {
1775 * Here unmap happens in parallel for only one reason:
1776 * del_gendisk() takes around half a second, so
1777 * on huge amount of devices the whole module unload
1778 * procedure takes minutes.
1780 INIT_WORK(&dev
->unmap_on_rmmod_work
, unmap_device_work
);
1781 queue_work(rnbd_clt_wq
, &dev
->unmap_on_rmmod_work
);
1783 rnbd_clt_put_sess(sess
);
1785 /* Wait for all scheduled unmap works */
1786 flush_workqueue(rnbd_clt_wq
);
1787 WARN_ON(!list_empty(&sess_list
));
1790 static int __init
rnbd_client_init(void)
1794 BUILD_BUG_ON(sizeof(struct rnbd_msg_hdr
) != 4);
1795 BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info
) != 36);
1796 BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info_rsp
) != 36);
1797 BUILD_BUG_ON(sizeof(struct rnbd_msg_open
) != 264);
1798 BUILD_BUG_ON(sizeof(struct rnbd_msg_close
) != 8);
1799 BUILD_BUG_ON(sizeof(struct rnbd_msg_open_rsp
) != 56);
1800 rnbd_client_major
= register_blkdev(rnbd_client_major
, "rnbd");
1801 if (rnbd_client_major
<= 0) {
1802 pr_err("Failed to load module, block device registration failed\n");
1806 err
= rnbd_clt_create_sysfs_files();
1808 pr_err("Failed to load module, creating sysfs device files failed, err: %d\n",
1810 unregister_blkdev(rnbd_client_major
, "rnbd");
1813 rnbd_clt_wq
= alloc_workqueue("rnbd_clt_wq", 0, 0);
1815 pr_err("Failed to load module, alloc_workqueue failed.\n");
1816 rnbd_clt_destroy_sysfs_files();
1817 unregister_blkdev(rnbd_client_major
, "rnbd");
1824 static void __exit
rnbd_client_exit(void)
1826 rnbd_destroy_sessions();
1827 unregister_blkdev(rnbd_client_major
, "rnbd");
1828 ida_destroy(&index_ida
);
1829 destroy_workqueue(rnbd_clt_wq
);
1832 module_init(rnbd_client_init
);
1833 module_exit(rnbd_client_exit
);