1 // SPDX-License-Identifier: GPL-2.0
3 * Shared Memory Communications over RDMA (SMC-R) and RoCE
5 * Connection Data Control (CDC)
8 * Copyright IBM Corp. 2016
10 * Author(s): Ursula Braun <ubraun@linux.vnet.ibm.com>
13 #include <linux/spinlock.h>
20 #include "smc_close.h"
23 /********************************** send *************************************/
25 /* handler for send/transmission completion of a CDC msg */
26 static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv
*pnd_snd
,
27 struct smc_link
*link
,
28 enum ib_wc_status wc_status
)
30 struct smc_cdc_tx_pend
*cdcpend
= (struct smc_cdc_tx_pend
*)pnd_snd
;
31 struct smc_connection
*conn
= cdcpend
->conn
;
32 struct smc_buf_desc
*sndbuf_desc
;
36 sndbuf_desc
= conn
->sndbuf_desc
;
37 smc
= container_of(conn
, struct smc_sock
, conn
);
38 bh_lock_sock(&smc
->sk
);
39 if (!wc_status
&& sndbuf_desc
) {
40 diff
= smc_curs_diff(sndbuf_desc
->len
,
41 &cdcpend
->conn
->tx_curs_fin
,
43 /* sndbuf_space is decreased in smc_sendmsg */
44 smp_mb__before_atomic();
45 atomic_add(diff
, &cdcpend
->conn
->sndbuf_space
);
46 /* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */
47 smp_mb__after_atomic();
48 smc_curs_copy(&conn
->tx_curs_fin
, &cdcpend
->cursor
, conn
);
49 smc_curs_copy(&conn
->local_tx_ctrl_fin
, &cdcpend
->p_cursor
,
51 conn
->tx_cdc_seq_fin
= cdcpend
->ctrl_seq
;
54 if (atomic_dec_and_test(&conn
->cdc_pend_tx_wr
)) {
55 /* If user owns the sock_lock, mark the connection need sending.
56 * User context will later try to send when it release sock_lock
59 if (sock_owned_by_user(&smc
->sk
))
60 conn
->tx_in_release_sock
= true;
64 if (unlikely(wq_has_sleeper(&conn
->cdc_pend_tx_wq
)))
65 wake_up(&conn
->cdc_pend_tx_wq
);
67 WARN_ON(atomic_read(&conn
->cdc_pend_tx_wr
) < 0);
69 smc_tx_sndbuf_nonfull(smc
);
70 bh_unlock_sock(&smc
->sk
);
73 int smc_cdc_get_free_slot(struct smc_connection
*conn
,
74 struct smc_link
*link
,
75 struct smc_wr_buf
**wr_buf
,
76 struct smc_rdma_wr
**wr_rdma_buf
,
77 struct smc_cdc_tx_pend
**pend
)
81 rc
= smc_wr_tx_get_free_slot(link
, smc_cdc_tx_handler
, wr_buf
,
83 (struct smc_wr_tx_pend_priv
**)pend
);
85 /* abnormal termination */
87 smc_wr_tx_put_slot(link
,
88 (struct smc_wr_tx_pend_priv
*)(*pend
));
94 static inline void smc_cdc_add_pending_send(struct smc_connection
*conn
,
95 struct smc_cdc_tx_pend
*pend
)
98 sizeof(struct smc_cdc_msg
) > SMC_WR_BUF_SIZE
,
99 "must increase SMC_WR_BUF_SIZE to at least sizeof(struct smc_cdc_msg)");
101 offsetofend(struct smc_cdc_msg
, reserved
) > SMC_WR_TX_SIZE
,
102 "must adapt SMC_WR_TX_SIZE to sizeof(struct smc_cdc_msg); if not all smc_wr upper layer protocols use the same message size any more, must start to set link->wr_tx_sges[i].length on each individual smc_wr_tx_send()");
104 sizeof(struct smc_cdc_tx_pend
) > SMC_WR_TX_PEND_PRIV_SIZE
,
105 "must increase SMC_WR_TX_PEND_PRIV_SIZE to at least sizeof(struct smc_cdc_tx_pend)");
107 pend
->cursor
= conn
->tx_curs_sent
;
108 pend
->p_cursor
= conn
->local_tx_ctrl
.prod
;
109 pend
->ctrl_seq
= conn
->tx_cdc_seq
;
112 int smc_cdc_msg_send(struct smc_connection
*conn
,
113 struct smc_wr_buf
*wr_buf
,
114 struct smc_cdc_tx_pend
*pend
)
116 struct smc_link
*link
= conn
->lnk
;
117 union smc_host_cursor cfed
;
120 smc_cdc_add_pending_send(conn
, pend
);
123 conn
->local_tx_ctrl
.seqno
= conn
->tx_cdc_seq
;
124 smc_host_msg_to_cdc((struct smc_cdc_msg
*)wr_buf
, conn
, &cfed
);
126 atomic_inc(&conn
->cdc_pend_tx_wr
);
127 smp_mb__after_atomic(); /* Make sure cdc_pend_tx_wr added before post */
129 rc
= smc_wr_tx_send(link
, (struct smc_wr_tx_pend_priv
*)pend
);
131 smc_curs_copy(&conn
->rx_curs_confirmed
, &cfed
, conn
);
132 conn
->local_rx_ctrl
.prod_flags
.cons_curs_upd_req
= 0;
135 conn
->local_tx_ctrl
.seqno
= conn
->tx_cdc_seq
;
136 atomic_dec(&conn
->cdc_pend_tx_wr
);
142 /* send a validation msg indicating the move of a conn to an other QP link */
143 int smcr_cdc_msg_send_validation(struct smc_connection
*conn
,
144 struct smc_cdc_tx_pend
*pend
,
145 struct smc_wr_buf
*wr_buf
)
147 struct smc_host_cdc_msg
*local
= &conn
->local_tx_ctrl
;
148 struct smc_link
*link
= conn
->lnk
;
149 struct smc_cdc_msg
*peer
;
152 peer
= (struct smc_cdc_msg
*)wr_buf
;
153 peer
->common
.type
= local
->common
.type
;
154 peer
->len
= local
->len
;
155 peer
->seqno
= htons(conn
->tx_cdc_seq_fin
); /* seqno last compl. tx */
156 peer
->token
= htonl(local
->token
);
157 peer
->prod_flags
.failover_validation
= 1;
159 /* We need to set pend->conn here to make sure smc_cdc_tx_handler()
160 * can handle properly
162 smc_cdc_add_pending_send(conn
, pend
);
164 atomic_inc(&conn
->cdc_pend_tx_wr
);
165 smp_mb__after_atomic(); /* Make sure cdc_pend_tx_wr added before post */
167 rc
= smc_wr_tx_send(link
, (struct smc_wr_tx_pend_priv
*)pend
);
169 atomic_dec(&conn
->cdc_pend_tx_wr
);
174 static int smcr_cdc_get_slot_and_msg_send(struct smc_connection
*conn
)
176 struct smc_cdc_tx_pend
*pend
;
177 struct smc_wr_buf
*wr_buf
;
178 struct smc_link
*link
;
184 if (!smc_wr_tx_link_hold(link
))
186 rc
= smc_cdc_get_free_slot(conn
, link
, &wr_buf
, NULL
, &pend
);
190 spin_lock_bh(&conn
->send_lock
);
191 if (link
!= conn
->lnk
) {
192 /* link of connection changed, try again one time*/
193 spin_unlock_bh(&conn
->send_lock
);
194 smc_wr_tx_put_slot(link
,
195 (struct smc_wr_tx_pend_priv
*)pend
);
196 smc_wr_tx_link_put(link
);
202 rc
= smc_cdc_msg_send(conn
, wr_buf
, pend
);
203 spin_unlock_bh(&conn
->send_lock
);
205 smc_wr_tx_link_put(link
);
209 int smc_cdc_get_slot_and_msg_send(struct smc_connection
*conn
)
213 if (!smc_conn_lgr_valid(conn
) ||
214 (conn
->lgr
->is_smcd
&& conn
->lgr
->peer_shutdown
))
217 if (conn
->lgr
->is_smcd
) {
218 spin_lock_bh(&conn
->send_lock
);
219 rc
= smcd_cdc_msg_send(conn
);
220 spin_unlock_bh(&conn
->send_lock
);
222 rc
= smcr_cdc_get_slot_and_msg_send(conn
);
228 void smc_cdc_wait_pend_tx_wr(struct smc_connection
*conn
)
230 wait_event(conn
->cdc_pend_tx_wq
, !atomic_read(&conn
->cdc_pend_tx_wr
));
233 /* Send a SMC-D CDC header.
234 * This increments the free space available in our send buffer.
235 * Also update the confirmed receive buffer with what was sent to the peer.
237 int smcd_cdc_msg_send(struct smc_connection
*conn
)
239 struct smc_sock
*smc
= container_of(conn
, struct smc_sock
, conn
);
240 union smc_host_cursor curs
;
241 struct smcd_cdc_msg cdc
;
244 memset(&cdc
, 0, sizeof(cdc
));
245 cdc
.common
.type
= SMC_CDC_MSG_TYPE
;
246 curs
.acurs
.counter
= atomic64_read(&conn
->local_tx_ctrl
.prod
.acurs
);
247 cdc
.prod
.wrap
= curs
.wrap
;
248 cdc
.prod
.count
= curs
.count
;
249 curs
.acurs
.counter
= atomic64_read(&conn
->local_tx_ctrl
.cons
.acurs
);
250 cdc
.cons
.wrap
= curs
.wrap
;
251 cdc
.cons
.count
= curs
.count
;
252 cdc
.cons
.prod_flags
= conn
->local_tx_ctrl
.prod_flags
;
253 cdc
.cons
.conn_state_flags
= conn
->local_tx_ctrl
.conn_state_flags
;
254 rc
= smcd_tx_ism_write(conn
, &cdc
, sizeof(cdc
), 0, 1);
257 smc_curs_copy(&conn
->rx_curs_confirmed
, &curs
, conn
);
258 conn
->local_rx_ctrl
.prod_flags
.cons_curs_upd_req
= 0;
260 if (smc_ism_support_dmb_nocopy(conn
->lgr
->smcd
))
261 /* if local sndbuf shares the same memory region with
262 * peer DMB, then don't update the tx_curs_fin
263 * and sndbuf_space until peer has consumed the data.
267 /* Calculate transmitted data and increment free send buffer space */
268 diff
= smc_curs_diff(conn
->sndbuf_desc
->len
, &conn
->tx_curs_fin
,
269 &conn
->tx_curs_sent
);
270 /* increased by confirmed number of bytes */
271 smp_mb__before_atomic();
272 atomic_add(diff
, &conn
->sndbuf_space
);
273 /* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */
274 smp_mb__after_atomic();
275 smc_curs_copy(&conn
->tx_curs_fin
, &conn
->tx_curs_sent
, conn
);
277 smc_tx_sndbuf_nonfull(smc
);
281 /********************************* receive ***********************************/
283 static inline bool smc_cdc_before(u16 seq1
, u16 seq2
)
285 return (s16
)(seq1
- seq2
) < 0;
288 static void smc_cdc_handle_urg_data_arrival(struct smc_sock
*smc
,
291 struct smc_connection
*conn
= &smc
->conn
;
294 /* new data included urgent business */
295 smc_curs_copy(&conn
->urg_curs
, &conn
->local_rx_ctrl
.prod
, conn
);
296 conn
->urg_state
= SMC_URG_VALID
;
297 if (!sock_flag(&smc
->sk
, SOCK_URGINLINE
))
298 /* we'll skip the urgent byte, so don't account for it */
300 base
= (char *)conn
->rmb_desc
->cpu_addr
+ conn
->rx_off
;
301 if (conn
->urg_curs
.count
)
302 conn
->urg_rx_byte
= *(base
+ conn
->urg_curs
.count
- 1);
304 conn
->urg_rx_byte
= *(base
+ conn
->rmb_desc
->len
- 1);
305 sk_send_sigurg(&smc
->sk
);
308 static void smc_cdc_msg_validate(struct smc_sock
*smc
, struct smc_cdc_msg
*cdc
,
309 struct smc_link
*link
)
311 struct smc_connection
*conn
= &smc
->conn
;
312 u16 recv_seq
= ntohs(cdc
->seqno
);
315 /* check that seqnum was seen before */
316 diff
= conn
->local_rx_ctrl
.seqno
- recv_seq
;
317 if (diff
< 0) { /* diff larger than 0x7fff */
318 /* drop connection */
319 conn
->out_of_sync
= 1; /* prevent any further receives */
320 spin_lock_bh(&conn
->send_lock
);
321 conn
->local_tx_ctrl
.conn_state_flags
.peer_conn_abort
= 1;
323 spin_unlock_bh(&conn
->send_lock
);
324 sock_hold(&smc
->sk
); /* sock_put in abort_work */
325 if (!queue_work(smc_close_wq
, &conn
->abort_work
))
330 static void smc_cdc_msg_recv_action(struct smc_sock
*smc
,
331 struct smc_cdc_msg
*cdc
)
333 union smc_host_cursor cons_old
, prod_old
;
334 struct smc_connection
*conn
= &smc
->conn
;
335 int diff_cons
, diff_prod
, diff_tx
;
337 smc_curs_copy(&prod_old
, &conn
->local_rx_ctrl
.prod
, conn
);
338 smc_curs_copy(&cons_old
, &conn
->local_rx_ctrl
.cons
, conn
);
339 smc_cdc_msg_to_host(&conn
->local_rx_ctrl
, cdc
, conn
);
341 diff_cons
= smc_curs_diff(conn
->peer_rmbe_size
, &cons_old
,
342 &conn
->local_rx_ctrl
.cons
);
344 /* peer_rmbe_space is decreased during data transfer with RDMA
347 smp_mb__before_atomic();
348 atomic_add(diff_cons
, &conn
->peer_rmbe_space
);
349 /* guarantee 0 <= peer_rmbe_space <= peer_rmbe_size */
350 smp_mb__after_atomic();
352 /* if local sndbuf shares the same memory region with
353 * peer RMB, then update tx_curs_fin and sndbuf_space
354 * here since peer has already consumed the data.
356 if (conn
->lgr
->is_smcd
&&
357 smc_ism_support_dmb_nocopy(conn
->lgr
->smcd
)) {
358 /* Calculate consumed data and
359 * increment free send buffer space.
361 diff_tx
= smc_curs_diff(conn
->sndbuf_desc
->len
,
363 &conn
->local_rx_ctrl
.cons
);
364 /* increase local sndbuf space and fin_curs */
365 smp_mb__before_atomic();
366 atomic_add(diff_tx
, &conn
->sndbuf_space
);
367 /* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */
368 smp_mb__after_atomic();
369 smc_curs_copy(&conn
->tx_curs_fin
,
370 &conn
->local_rx_ctrl
.cons
, conn
);
372 smc_tx_sndbuf_nonfull(smc
);
376 diff_prod
= smc_curs_diff(conn
->rmb_desc
->len
, &prod_old
,
377 &conn
->local_rx_ctrl
.prod
);
379 if (conn
->local_rx_ctrl
.prod_flags
.urg_data_present
)
380 smc_cdc_handle_urg_data_arrival(smc
, &diff_prod
);
381 /* bytes_to_rcv is decreased in smc_recvmsg */
382 smp_mb__before_atomic();
383 atomic_add(diff_prod
, &conn
->bytes_to_rcv
);
384 /* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */
385 smp_mb__after_atomic();
386 smc
->sk
.sk_data_ready(&smc
->sk
);
388 if (conn
->local_rx_ctrl
.prod_flags
.write_blocked
)
389 smc
->sk
.sk_data_ready(&smc
->sk
);
390 if (conn
->local_rx_ctrl
.prod_flags
.urg_data_pending
)
391 conn
->urg_state
= SMC_URG_NOTYET
;
394 /* trigger sndbuf consumer: RDMA write into peer RMBE and CDC */
395 if ((diff_cons
&& smc_tx_prepared_sends(conn
)) ||
396 conn
->local_rx_ctrl
.prod_flags
.cons_curs_upd_req
||
397 conn
->local_rx_ctrl
.prod_flags
.urg_data_pending
) {
398 if (!sock_owned_by_user(&smc
->sk
))
399 smc_tx_pending(conn
);
401 conn
->tx_in_release_sock
= true;
404 if (diff_cons
&& conn
->urg_tx_pend
&&
405 atomic_read(&conn
->peer_rmbe_space
) == conn
->peer_rmbe_size
) {
406 /* urg data confirmed by peer, indicate we're ready for more */
407 conn
->urg_tx_pend
= false;
408 smc
->sk
.sk_write_space(&smc
->sk
);
411 if (conn
->local_rx_ctrl
.conn_state_flags
.peer_conn_abort
) {
412 smc
->sk
.sk_err
= ECONNRESET
;
413 conn
->local_tx_ctrl
.conn_state_flags
.peer_conn_abort
= 1;
415 if (smc_cdc_rxed_any_close_or_senddone(conn
)) {
416 smc
->sk
.sk_shutdown
|= RCV_SHUTDOWN
;
417 if (smc
->clcsock
&& smc
->clcsock
->sk
)
418 smc
->clcsock
->sk
->sk_shutdown
|= RCV_SHUTDOWN
;
419 smc_sock_set_flag(&smc
->sk
, SOCK_DONE
);
420 sock_hold(&smc
->sk
); /* sock_put in close_work */
421 if (!queue_work(smc_close_wq
, &conn
->close_work
))
426 /* called under tasklet context */
427 static void smc_cdc_msg_recv(struct smc_sock
*smc
, struct smc_cdc_msg
*cdc
)
430 bh_lock_sock(&smc
->sk
);
431 smc_cdc_msg_recv_action(smc
, cdc
);
432 bh_unlock_sock(&smc
->sk
);
433 sock_put(&smc
->sk
); /* no free sk in softirq-context */
436 /* Schedule a tasklet for this connection. Triggered from the ISM device IRQ
437 * handler to indicate update in the DMBE.
442 static void smcd_cdc_rx_tsklet(struct tasklet_struct
*t
)
444 struct smc_connection
*conn
= from_tasklet(conn
, t
, rx_tsklet
);
445 struct smcd_cdc_msg
*data_cdc
;
446 struct smcd_cdc_msg cdc
;
447 struct smc_sock
*smc
;
449 if (!conn
|| conn
->killed
)
452 data_cdc
= (struct smcd_cdc_msg
*)conn
->rmb_desc
->cpu_addr
;
453 smcd_curs_copy(&cdc
.prod
, &data_cdc
->prod
, conn
);
454 smcd_curs_copy(&cdc
.cons
, &data_cdc
->cons
, conn
);
455 smc
= container_of(conn
, struct smc_sock
, conn
);
456 smc_cdc_msg_recv(smc
, (struct smc_cdc_msg
*)&cdc
);
459 /* Initialize receive tasklet. Called from ISM device IRQ handler to start
462 void smcd_cdc_rx_init(struct smc_connection
*conn
)
464 tasklet_setup(&conn
->rx_tsklet
, smcd_cdc_rx_tsklet
);
467 /***************************** init, exit, misc ******************************/
469 static void smc_cdc_rx_handler(struct ib_wc
*wc
, void *buf
)
471 struct smc_link
*link
= (struct smc_link
*)wc
->qp
->qp_context
;
472 struct smc_cdc_msg
*cdc
= buf
;
473 struct smc_connection
*conn
;
474 struct smc_link_group
*lgr
;
475 struct smc_sock
*smc
;
477 if (wc
->byte_len
< offsetof(struct smc_cdc_msg
, reserved
))
478 return; /* short message */
479 if (cdc
->len
!= SMC_WR_TX_SIZE
)
480 return; /* invalid message */
482 /* lookup connection */
483 lgr
= smc_get_lgr(link
);
484 read_lock_bh(&lgr
->conns_lock
);
485 conn
= smc_lgr_find_conn(ntohl(cdc
->token
), lgr
);
486 read_unlock_bh(&lgr
->conns_lock
);
487 if (!conn
|| conn
->out_of_sync
)
489 smc
= container_of(conn
, struct smc_sock
, conn
);
491 if (cdc
->prod_flags
.failover_validation
) {
492 smc_cdc_msg_validate(smc
, cdc
, link
);
495 if (smc_cdc_before(ntohs(cdc
->seqno
),
496 conn
->local_rx_ctrl
.seqno
))
497 /* received seqno is old */
500 smc_cdc_msg_recv(smc
, cdc
);
503 static struct smc_wr_rx_handler smc_cdc_rx_handlers
[] = {
505 .handler
= smc_cdc_rx_handler
,
506 .type
= SMC_CDC_MSG_TYPE
513 int __init
smc_cdc_init(void)
515 struct smc_wr_rx_handler
*handler
;
518 for (handler
= smc_cdc_rx_handlers
; handler
->handler
; handler
++) {
519 INIT_HLIST_NODE(&handler
->list
);
520 rc
= smc_wr_rx_register_handler(handler
);