1 // SPDX-License-Identifier: GPL-2.0
3 * Shared Memory Communications over RDMA (SMC-R) and RoCE
7 * Copy user space data into send buffer, if send buffer space available.
9 * Trigger RDMA write into RMBE of peer and send CDC, if RMBE space available.
11 * Copyright IBM Corp. 2016
13 * Author(s): Ursula Braun <ubraun@linux.vnet.ibm.com>
16 #include <linux/net.h>
17 #include <linux/rcupdate.h>
18 #include <linux/workqueue.h>
19 #include <linux/sched/signal.h>
27 #include "smc_close.h"
30 #include "smc_stats.h"
31 #include "smc_tracepoint.h"
33 #define SMC_TX_WORK_DELAY 0
35 /***************************** sndbuf producer *******************************/
37 /* callback implementation for sk.sk_write_space()
38 * to wakeup sndbuf producers that blocked with smc_tx_wait().
39 * called under sk_socket lock.
41 static void smc_tx_write_space(struct sock
*sk
)
43 struct socket
*sock
= sk
->sk_socket
;
44 struct smc_sock
*smc
= smc_sk(sk
);
47 /* similar to sk_stream_write_space */
48 if (atomic_read(&smc
->conn
.sndbuf_space
) && sock
) {
49 if (test_bit(SOCK_NOSPACE
, &sock
->flags
))
50 SMC_STAT_RMB_TX_FULL(smc
, !smc
->conn
.lnk
);
51 clear_bit(SOCK_NOSPACE
, &sock
->flags
);
53 wq
= rcu_dereference(sk
->sk_wq
);
54 if (skwq_has_sleeper(wq
))
55 wake_up_interruptible_poll(&wq
->wait
,
56 EPOLLOUT
| EPOLLWRNORM
|
58 if (wq
&& wq
->fasync_list
&& !(sk
->sk_shutdown
& SEND_SHUTDOWN
))
59 sock_wake_async(wq
, SOCK_WAKE_SPACE
, POLL_OUT
);
64 /* Wakeup sndbuf producers that blocked with smc_tx_wait().
65 * Cf. tcp_data_snd_check()=>tcp_check_space()=>tcp_new_space().
67 void smc_tx_sndbuf_nonfull(struct smc_sock
*smc
)
69 if (smc
->sk
.sk_socket
&&
70 test_bit(SOCK_NOSPACE
, &smc
->sk
.sk_socket
->flags
))
71 smc
->sk
.sk_write_space(&smc
->sk
);
74 /* blocks sndbuf producer until at least one byte of free space available
75 * or urgent Byte was consumed
77 static int smc_tx_wait(struct smc_sock
*smc
, int flags
)
79 DEFINE_WAIT_FUNC(wait
, woken_wake_function
);
80 struct smc_connection
*conn
= &smc
->conn
;
81 struct sock
*sk
= &smc
->sk
;
85 /* similar to sk_stream_wait_memory */
86 timeo
= sock_sndtimeo(sk
, flags
& MSG_DONTWAIT
);
87 add_wait_queue(sk_sleep(sk
), &wait
);
89 sk_set_bit(SOCKWQ_ASYNC_NOSPACE
, sk
);
91 (sk
->sk_shutdown
& SEND_SHUTDOWN
) ||
93 conn
->local_tx_ctrl
.conn_state_flags
.peer_done_writing
) {
97 if (smc_cdc_rxed_any_close(conn
)) {
102 /* ensure EPOLLOUT is subsequently generated */
103 set_bit(SOCK_NOSPACE
, &sk
->sk_socket
->flags
);
107 if (signal_pending(current
)) {
108 rc
= sock_intr_errno(timeo
);
111 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE
, sk
);
112 if (atomic_read(&conn
->sndbuf_space
) && !conn
->urg_tx_pend
)
113 break; /* at least 1 byte of free & no urgent data */
114 set_bit(SOCK_NOSPACE
, &sk
->sk_socket
->flags
);
115 sk_wait_event(sk
, &timeo
,
116 READ_ONCE(sk
->sk_err
) ||
117 (READ_ONCE(sk
->sk_shutdown
) & SEND_SHUTDOWN
) ||
118 smc_cdc_rxed_any_close(conn
) ||
119 (atomic_read(&conn
->sndbuf_space
) &&
123 remove_wait_queue(sk_sleep(sk
), &wait
);
127 static bool smc_tx_is_corked(struct smc_sock
*smc
)
129 struct tcp_sock
*tp
= tcp_sk(smc
->clcsock
->sk
);
131 return (tp
->nonagle
& TCP_NAGLE_CORK
) ? true : false;
134 /* If we have pending CDC messages, do not send:
135 * Because CQE of this CDC message will happen shortly, it gives
136 * a chance to coalesce future sendmsg() payload in to one RDMA Write,
137 * without need for a timer, and with no latency trade off.
139 * 1. First message should never cork
140 * 2. If we have pending Tx CDC messages, wait for the first CDC
141 * message's completion
142 * 3. Don't cork to much data in a single RDMA Write to prevent burst
143 * traffic, total corked message should not exceed sendbuf/2
145 static bool smc_should_autocork(struct smc_sock
*smc
)
147 struct smc_connection
*conn
= &smc
->conn
;
150 corking_size
= min_t(unsigned int, conn
->sndbuf_desc
->len
>> 1,
151 sock_net(&smc
->sk
)->smc
.sysctl_autocorking_size
);
153 if (atomic_read(&conn
->cdc_pend_tx_wr
) == 0 ||
154 smc_tx_prepared_sends(conn
) > corking_size
)
159 static bool smc_tx_should_cork(struct smc_sock
*smc
, struct msghdr
*msg
)
161 struct smc_connection
*conn
= &smc
->conn
;
163 if (smc_should_autocork(smc
))
166 /* for a corked socket defer the RDMA writes if
167 * sndbuf_space is still available. The applications
168 * should known how/when to uncork it.
170 if ((msg
->msg_flags
& MSG_MORE
||
171 smc_tx_is_corked(smc
)) &&
172 atomic_read(&conn
->sndbuf_space
))
178 /* sndbuf producer: main API called by socket layer.
179 * called under sock lock.
181 int smc_tx_sendmsg(struct smc_sock
*smc
, struct msghdr
*msg
, size_t len
)
183 size_t copylen
, send_done
= 0, send_remaining
= len
;
184 size_t chunk_len
, chunk_off
, chunk_len_sum
;
185 struct smc_connection
*conn
= &smc
->conn
;
186 union smc_host_cursor prep
;
187 struct sock
*sk
= &smc
->sk
;
193 /* This should be in poll */
194 sk_clear_bit(SOCKWQ_ASYNC_NOSPACE
, sk
);
196 if (sk
->sk_err
|| (sk
->sk_shutdown
& SEND_SHUTDOWN
)) {
201 if (sk
->sk_state
== SMC_INIT
)
204 if (len
> conn
->sndbuf_desc
->len
)
205 SMC_STAT_RMB_TX_SIZE_SMALL(smc
, !conn
->lnk
);
207 if (len
> conn
->peer_rmbe_size
)
208 SMC_STAT_RMB_TX_PEER_SIZE_SMALL(smc
, !conn
->lnk
);
210 if (msg
->msg_flags
& MSG_OOB
)
211 SMC_STAT_INC(smc
, urg_data_cnt
);
213 while (msg_data_left(msg
)) {
214 if (smc
->sk
.sk_shutdown
& SEND_SHUTDOWN
||
215 (smc
->sk
.sk_err
== ECONNABORTED
) ||
218 if (smc_cdc_rxed_any_close(conn
))
219 return send_done
?: -ECONNRESET
;
221 if (msg
->msg_flags
& MSG_OOB
)
222 conn
->local_tx_ctrl
.prod_flags
.urg_data_pending
= 1;
224 if (!atomic_read(&conn
->sndbuf_space
) || conn
->urg_tx_pend
) {
227 rc
= smc_tx_wait(smc
, msg
->msg_flags
);
233 /* initialize variables for 1st iteration of subsequent loop */
234 /* could be just 1 byte, even after smc_tx_wait above */
235 writespace
= atomic_read(&conn
->sndbuf_space
);
236 /* not more than what user space asked for */
237 copylen
= min_t(size_t, send_remaining
, writespace
);
238 /* determine start of sndbuf */
239 sndbuf_base
= conn
->sndbuf_desc
->cpu_addr
;
240 smc_curs_copy(&prep
, &conn
->tx_curs_prep
, conn
);
241 tx_cnt_prep
= prep
.count
;
242 /* determine chunks where to write into sndbuf */
243 /* either unwrapped case, or 1st chunk of wrapped case */
244 chunk_len
= min_t(size_t, copylen
, conn
->sndbuf_desc
->len
-
246 chunk_len_sum
= chunk_len
;
247 chunk_off
= tx_cnt_prep
;
248 for (chunk
= 0; chunk
< 2; chunk
++) {
249 rc
= memcpy_from_msg(sndbuf_base
+ chunk_off
,
252 smc_sndbuf_sync_sg_for_device(conn
);
257 send_done
+= chunk_len
;
258 send_remaining
-= chunk_len
;
260 if (chunk_len_sum
== copylen
)
261 break; /* either on 1st or 2nd iteration */
262 /* prepare next (== 2nd) iteration */
263 chunk_len
= copylen
- chunk_len
; /* remainder */
264 chunk_len_sum
+= chunk_len
;
265 chunk_off
= 0; /* modulo offset in send ring buffer */
267 smc_sndbuf_sync_sg_for_device(conn
);
269 smc_curs_add(conn
->sndbuf_desc
->len
, &prep
, copylen
);
270 smc_curs_copy(&conn
->tx_curs_prep
, &prep
, conn
);
271 /* increased in send tasklet smc_cdc_tx_handler() */
272 smp_mb__before_atomic();
273 atomic_sub(copylen
, &conn
->sndbuf_space
);
274 /* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */
275 smp_mb__after_atomic();
276 /* since we just produced more new data into sndbuf,
277 * trigger sndbuf consumer: RDMA write into peer RMBE and CDC
279 if ((msg
->msg_flags
& MSG_OOB
) && !send_remaining
)
280 conn
->urg_tx_pend
= true;
281 /* If we need to cork, do nothing and wait for the next
282 * sendmsg() call or push on tx completion
284 if (!smc_tx_should_cork(smc
, msg
))
285 smc_tx_sndbuf_nonempty(conn
);
287 trace_smc_tx_sendmsg(smc
, copylen
);
288 } /* while (msg_data_left(msg)) */
293 rc
= sk_stream_error(sk
, msg
->msg_flags
, rc
);
294 /* make sure we wake any epoll edge trigger waiter */
295 if (unlikely(rc
== -EAGAIN
))
296 sk
->sk_write_space(sk
);
300 /***************************** sndbuf consumer *******************************/
302 /* sndbuf consumer: actual data transfer of one target chunk with ISM write */
303 int smcd_tx_ism_write(struct smc_connection
*conn
, void *data
, size_t len
,
304 u32 offset
, int signal
)
308 rc
= smc_ism_write(conn
->lgr
->smcd
, conn
->peer_token
,
309 conn
->peer_rmbe_idx
, signal
, conn
->tx_off
+ offset
,
312 conn
->local_tx_ctrl
.conn_state_flags
.peer_conn_abort
= 1;
316 /* sndbuf consumer: actual data transfer of one target chunk with RDMA write */
317 static int smc_tx_rdma_write(struct smc_connection
*conn
, int peer_rmbe_offset
,
318 int num_sges
, struct ib_rdma_wr
*rdma_wr
)
320 struct smc_link_group
*lgr
= conn
->lgr
;
321 struct smc_link
*link
= conn
->lnk
;
324 rdma_wr
->wr
.wr_id
= smc_wr_tx_get_next_wr_id(link
);
325 rdma_wr
->wr
.num_sge
= num_sges
;
326 rdma_wr
->remote_addr
=
327 lgr
->rtokens
[conn
->rtoken_idx
][link
->link_idx
].dma_addr
+
328 /* RMBE within RMB */
330 /* offset within RMBE */
332 rdma_wr
->rkey
= lgr
->rtokens
[conn
->rtoken_idx
][link
->link_idx
].rkey
;
333 rc
= ib_post_send(link
->roce_qp
, &rdma_wr
->wr
, NULL
);
335 smcr_link_down_cond_sched(link
);
339 /* sndbuf consumer */
340 static inline void smc_tx_advance_cursors(struct smc_connection
*conn
,
341 union smc_host_cursor
*prod
,
342 union smc_host_cursor
*sent
,
345 smc_curs_add(conn
->peer_rmbe_size
, prod
, len
);
346 /* increased in recv tasklet smc_cdc_msg_rcv() */
347 smp_mb__before_atomic();
348 /* data in flight reduces usable snd_wnd */
349 atomic_sub(len
, &conn
->peer_rmbe_space
);
350 /* guarantee 0 <= peer_rmbe_space <= peer_rmbe_size */
351 smp_mb__after_atomic();
352 smc_curs_add(conn
->sndbuf_desc
->len
, sent
, len
);
355 /* SMC-R helper for smc_tx_rdma_writes() */
356 static int smcr_tx_rdma_writes(struct smc_connection
*conn
, size_t len
,
357 size_t src_off
, size_t src_len
,
358 size_t dst_off
, size_t dst_len
,
359 struct smc_rdma_wr
*wr_rdma_buf
)
361 struct smc_link
*link
= conn
->lnk
;
363 dma_addr_t dma_addr
=
364 sg_dma_address(conn
->sndbuf_desc
->sgt
[link
->link_idx
].sgl
);
365 u64 virt_addr
= (uintptr_t)conn
->sndbuf_desc
->cpu_addr
;
366 int src_len_sum
= src_len
, dst_len_sum
= dst_len
;
367 int sent_count
= src_off
;
368 int srcchunk
, dstchunk
;
372 for (dstchunk
= 0; dstchunk
< 2; dstchunk
++) {
373 struct ib_rdma_wr
*wr
= &wr_rdma_buf
->wr_tx_rdma
[dstchunk
];
374 struct ib_sge
*sge
= wr
->wr
.sg_list
;
375 u64 base_addr
= dma_addr
;
377 if (dst_len
< link
->qp_attr
.cap
.max_inline_data
) {
378 base_addr
= virt_addr
;
379 wr
->wr
.send_flags
|= IB_SEND_INLINE
;
381 wr
->wr
.send_flags
&= ~IB_SEND_INLINE
;
385 for (srcchunk
= 0; srcchunk
< 2; srcchunk
++) {
386 sge
[srcchunk
].addr
= conn
->sndbuf_desc
->is_vm
?
387 (virt_addr
+ src_off
) : (base_addr
+ src_off
);
388 sge
[srcchunk
].length
= src_len
;
389 if (conn
->sndbuf_desc
->is_vm
)
391 conn
->sndbuf_desc
->mr
[link
->link_idx
]->lkey
;
395 if (src_off
>= conn
->sndbuf_desc
->len
)
396 src_off
-= conn
->sndbuf_desc
->len
;
397 /* modulo in send ring */
398 if (src_len_sum
== dst_len
)
399 break; /* either on 1st or 2nd iteration */
400 /* prepare next (== 2nd) iteration */
401 src_len
= dst_len
- src_len
; /* remainder */
402 src_len_sum
+= src_len
;
404 rc
= smc_tx_rdma_write(conn
, dst_off
, num_sges
, wr
);
407 if (dst_len_sum
== len
)
408 break; /* either on 1st or 2nd iteration */
409 /* prepare next (== 2nd) iteration */
410 dst_off
= 0; /* modulo offset in RMBE ring buffer */
411 dst_len
= len
- dst_len
; /* remainder */
412 dst_len_sum
+= dst_len
;
413 src_len
= min_t(int, dst_len
, conn
->sndbuf_desc
->len
-
415 src_len_sum
= src_len
;
420 /* SMC-D helper for smc_tx_rdma_writes() */
421 static int smcd_tx_rdma_writes(struct smc_connection
*conn
, size_t len
,
422 size_t src_off
, size_t src_len
,
423 size_t dst_off
, size_t dst_len
)
425 int src_len_sum
= src_len
, dst_len_sum
= dst_len
;
426 int srcchunk
, dstchunk
;
429 for (dstchunk
= 0; dstchunk
< 2; dstchunk
++) {
430 for (srcchunk
= 0; srcchunk
< 2; srcchunk
++) {
431 void *data
= conn
->sndbuf_desc
->cpu_addr
+ src_off
;
433 rc
= smcd_tx_ism_write(conn
, data
, src_len
, dst_off
+
434 sizeof(struct smcd_cdc_msg
), 0);
439 if (src_off
>= conn
->sndbuf_desc
->len
)
440 src_off
-= conn
->sndbuf_desc
->len
;
441 /* modulo in send ring */
442 if (src_len_sum
== dst_len
)
443 break; /* either on 1st or 2nd iteration */
444 /* prepare next (== 2nd) iteration */
445 src_len
= dst_len
- src_len
; /* remainder */
446 src_len_sum
+= src_len
;
448 if (dst_len_sum
== len
)
449 break; /* either on 1st or 2nd iteration */
450 /* prepare next (== 2nd) iteration */
451 dst_off
= 0; /* modulo offset in RMBE ring buffer */
452 dst_len
= len
- dst_len
; /* remainder */
453 dst_len_sum
+= dst_len
;
454 src_len
= min_t(int, dst_len
, conn
->sndbuf_desc
->len
- src_off
);
455 src_len_sum
= src_len
;
460 /* sndbuf consumer: prepare all necessary (src&dst) chunks of data transmit;
461 * usable snd_wnd as max transmit
463 static int smc_tx_rdma_writes(struct smc_connection
*conn
,
464 struct smc_rdma_wr
*wr_rdma_buf
)
466 size_t len
, src_len
, dst_off
, dst_len
; /* current chunk values */
467 union smc_host_cursor sent
, prep
, prod
, cons
;
468 struct smc_cdc_producer_flags
*pflags
;
469 int to_send
, rmbespace
;
473 smc_curs_copy(&sent
, &conn
->tx_curs_sent
, conn
);
474 smc_curs_copy(&prep
, &conn
->tx_curs_prep
, conn
);
475 /* cf. wmem_alloc - (snd_max - snd_una) */
476 to_send
= smc_curs_diff(conn
->sndbuf_desc
->len
, &sent
, &prep
);
480 /* destination: RMBE */
482 rmbespace
= atomic_read(&conn
->peer_rmbe_space
);
483 if (rmbespace
<= 0) {
484 struct smc_sock
*smc
= container_of(conn
, struct smc_sock
,
486 SMC_STAT_RMB_TX_PEER_FULL(smc
, !conn
->lnk
);
489 smc_curs_copy(&prod
, &conn
->local_tx_ctrl
.prod
, conn
);
490 smc_curs_copy(&cons
, &conn
->local_rx_ctrl
.cons
, conn
);
492 /* if usable snd_wnd closes ask peer to advertise once it opens again */
493 pflags
= &conn
->local_tx_ctrl
.prod_flags
;
494 pflags
->write_blocked
= (to_send
>= rmbespace
);
495 /* cf. usable snd_wnd */
496 len
= min(to_send
, rmbespace
);
498 /* initialize variables for first iteration of subsequent nested loop */
499 dst_off
= prod
.count
;
500 if (prod
.wrap
== cons
.wrap
) {
501 /* the filled destination area is unwrapped,
502 * hence the available free destination space is wrapped
503 * and we need 2 destination chunks of sum len; start with 1st
504 * which is limited by what's available in sndbuf
506 dst_len
= min_t(size_t,
507 conn
->peer_rmbe_size
- prod
.count
, len
);
509 /* the filled destination area is wrapped,
510 * hence the available free destination space is unwrapped
511 * and we need a single destination chunk of entire len
515 /* dst_len determines the maximum src_len */
516 if (sent
.count
+ dst_len
<= conn
->sndbuf_desc
->len
) {
517 /* unwrapped src case: single chunk of entire dst_len */
520 /* wrapped src case: 2 chunks of sum dst_len; start with 1st: */
521 src_len
= conn
->sndbuf_desc
->len
- sent
.count
;
524 if (conn
->lgr
->is_smcd
)
525 rc
= smcd_tx_rdma_writes(conn
, len
, sent
.count
, src_len
,
528 rc
= smcr_tx_rdma_writes(conn
, len
, sent
.count
, src_len
,
529 dst_off
, dst_len
, wr_rdma_buf
);
533 if (conn
->urg_tx_pend
&& len
== to_send
)
534 pflags
->urg_data_present
= 1;
535 smc_tx_advance_cursors(conn
, &prod
, &sent
, len
);
536 /* update connection's cursors with advanced local cursors */
537 smc_curs_copy(&conn
->local_tx_ctrl
.prod
, &prod
, conn
);
539 smc_curs_copy(&conn
->tx_curs_sent
, &sent
, conn
);/* src: local sndbuf */
544 /* Wakeup sndbuf consumers from any context (IRQ or process)
545 * since there is more data to transmit; usable snd_wnd as max transmit
547 static int smcr_tx_sndbuf_nonempty(struct smc_connection
*conn
)
549 struct smc_cdc_producer_flags
*pflags
= &conn
->local_tx_ctrl
.prod_flags
;
550 struct smc_link
*link
= conn
->lnk
;
551 struct smc_rdma_wr
*wr_rdma_buf
;
552 struct smc_cdc_tx_pend
*pend
;
553 struct smc_wr_buf
*wr_buf
;
556 if (!link
|| !smc_wr_tx_link_hold(link
))
558 rc
= smc_cdc_get_free_slot(conn
, link
, &wr_buf
, &wr_rdma_buf
, &pend
);
560 smc_wr_tx_link_put(link
);
562 struct smc_sock
*smc
=
563 container_of(conn
, struct smc_sock
, conn
);
565 if (smc
->sk
.sk_err
== ECONNABORTED
)
566 return sock_error(&smc
->sk
);
570 mod_delayed_work(conn
->lgr
->tx_wq
, &conn
->tx_work
,
576 spin_lock_bh(&conn
->send_lock
);
577 if (link
!= conn
->lnk
) {
578 /* link of connection changed, tx_work will restart */
579 smc_wr_tx_put_slot(link
,
580 (struct smc_wr_tx_pend_priv
*)pend
);
584 if (!pflags
->urg_data_present
) {
585 rc
= smc_tx_rdma_writes(conn
, wr_rdma_buf
);
587 smc_wr_tx_put_slot(link
,
588 (struct smc_wr_tx_pend_priv
*)pend
);
593 rc
= smc_cdc_msg_send(conn
, wr_buf
, pend
);
594 if (!rc
&& pflags
->urg_data_present
) {
595 pflags
->urg_data_pending
= 0;
596 pflags
->urg_data_present
= 0;
600 spin_unlock_bh(&conn
->send_lock
);
601 smc_wr_tx_link_put(link
);
605 static int smcd_tx_sndbuf_nonempty(struct smc_connection
*conn
)
607 struct smc_cdc_producer_flags
*pflags
= &conn
->local_tx_ctrl
.prod_flags
;
610 spin_lock_bh(&conn
->send_lock
);
611 if (!pflags
->urg_data_present
)
612 rc
= smc_tx_rdma_writes(conn
, NULL
);
614 rc
= smcd_cdc_msg_send(conn
);
616 if (!rc
&& pflags
->urg_data_present
) {
617 pflags
->urg_data_pending
= 0;
618 pflags
->urg_data_present
= 0;
620 spin_unlock_bh(&conn
->send_lock
);
624 int smc_tx_sndbuf_nonempty(struct smc_connection
*conn
)
626 struct smc_sock
*smc
= container_of(conn
, struct smc_sock
, conn
);
629 /* No data in the send queue */
630 if (unlikely(smc_tx_prepared_sends(conn
) <= 0))
633 /* Peer don't have RMBE space */
634 if (unlikely(atomic_read(&conn
->peer_rmbe_space
) <= 0)) {
635 SMC_STAT_RMB_TX_PEER_FULL(smc
, !conn
->lnk
);
640 conn
->local_rx_ctrl
.conn_state_flags
.peer_conn_abort
) {
641 rc
= -EPIPE
; /* connection being aborted */
644 if (conn
->lgr
->is_smcd
)
645 rc
= smcd_tx_sndbuf_nonempty(conn
);
647 rc
= smcr_tx_sndbuf_nonempty(conn
);
650 /* trigger socket release if connection is closing */
651 smc_close_wake_tx_prepared(smc
);
658 /* Wakeup sndbuf consumers from process context
659 * since there is more data to transmit. The caller
660 * must hold sock lock.
662 void smc_tx_pending(struct smc_connection
*conn
)
664 struct smc_sock
*smc
= container_of(conn
, struct smc_sock
, conn
);
670 rc
= smc_tx_sndbuf_nonempty(conn
);
671 if (!rc
&& conn
->local_rx_ctrl
.prod_flags
.write_blocked
&&
672 !atomic_read(&conn
->bytes_to_rcv
))
673 conn
->local_rx_ctrl
.prod_flags
.write_blocked
= 0;
676 /* Wakeup sndbuf consumers from process context
677 * since there is more data to transmit in locked
680 void smc_tx_work(struct work_struct
*work
)
682 struct smc_connection
*conn
= container_of(to_delayed_work(work
),
683 struct smc_connection
,
685 struct smc_sock
*smc
= container_of(conn
, struct smc_sock
, conn
);
688 smc_tx_pending(conn
);
689 release_sock(&smc
->sk
);
692 void smc_tx_consumer_update(struct smc_connection
*conn
, bool force
)
694 union smc_host_cursor cfed
, cons
, prod
;
695 int sender_free
= conn
->rmb_desc
->len
;
698 smc_curs_copy(&cons
, &conn
->local_tx_ctrl
.cons
, conn
);
699 smc_curs_copy(&cfed
, &conn
->rx_curs_confirmed
, conn
);
700 to_confirm
= smc_curs_diff(conn
->rmb_desc
->len
, &cfed
, &cons
);
701 if (to_confirm
> conn
->rmbe_update_limit
) {
702 smc_curs_copy(&prod
, &conn
->local_rx_ctrl
.prod
, conn
);
703 sender_free
= conn
->rmb_desc
->len
-
704 smc_curs_diff_large(conn
->rmb_desc
->len
,
708 if (conn
->local_rx_ctrl
.prod_flags
.cons_curs_upd_req
||
710 ((to_confirm
> conn
->rmbe_update_limit
) &&
711 ((sender_free
<= (conn
->rmb_desc
->len
/ 2)) ||
712 conn
->local_rx_ctrl
.prod_flags
.write_blocked
))) {
714 conn
->local_rx_ctrl
.conn_state_flags
.peer_conn_abort
)
716 if ((smc_cdc_get_slot_and_msg_send(conn
) < 0) &&
718 queue_delayed_work(conn
->lgr
->tx_wq
, &conn
->tx_work
,
723 if (conn
->local_rx_ctrl
.prod_flags
.write_blocked
&&
724 !atomic_read(&conn
->bytes_to_rcv
))
725 conn
->local_rx_ctrl
.prod_flags
.write_blocked
= 0;
728 /***************************** send initialize *******************************/
730 /* Initialize send properties on connection establishment. NB: not __init! */
731 void smc_tx_init(struct smc_sock
*smc
)
733 smc
->sk
.sk_write_space
= smc_tx_write_space
;