1 /* Copyright (c) 2019-2021, The Tor Project, Inc. */
2 /* See LICENSE for licensing information */
5 * \file congestion_control_flow.c
6 * \brief Code that implements flow control for congestion controlled
10 #define TOR_CONGESTION_CONTROL_FLOW_PRIVATE
12 #include "core/or/or.h"
14 #include "core/or/relay.h"
15 #include "core/mainloop/connection.h"
16 #include "core/or/connection_edge.h"
17 #include "core/mainloop/mainloop.h"
18 #include "core/or/congestion_control_common.h"
19 #include "core/or/congestion_control_flow.h"
20 #include "core/or/congestion_control_st.h"
21 #include "core/or/circuitlist.h"
22 #include "core/or/trace_probes_cc.h"
23 #include "feature/nodelist/networkstatus.h"
24 #include "trunnel/flow_control_cells.h"
25 #include "feature/control/control_events.h"
26 #include "lib/math/stats.h"
28 #include "core/or/connection_st.h"
29 #include "core/or/cell_st.h"
30 #include "app/config/config.h"
31 #include "core/or/conflux_util.h"
33 /** Cache consensus parameters */
34 static uint32_t xoff_client
;
35 static uint32_t xoff_exit
;
37 static uint32_t xon_change_pct
;
38 static uint32_t xon_ewma_cnt
;
39 static uint32_t xon_rate_bytes
;
41 /** Metricsport stats */
42 uint64_t cc_stats_flow_num_xoff_sent
;
43 uint64_t cc_stats_flow_num_xon_sent
;
44 double cc_stats_flow_xoff_outbuf_ma
= 0;
45 double cc_stats_flow_xon_outbuf_ma
= 0;
47 /* In normal operation, we can get a burst of up to 32 cells before returning
48 * to libevent to flush the outbuf. This is a heuristic from hardcoded values
49 * and strange logic in connection_bucket_get_share(). */
50 #define MAX_EXPECTED_CELL_BURST 32
52 /* The following three are for dropmark rate limiting. They define when we
53 * scale down our XON, XOFF, and xmit byte counts. Early scaling is beneficial
54 * because it limits the ability of spurious XON/XOFF to be sent after large
55 * amounts of data without XON/XOFF. At these limits, after 10MB of data (or
56 * more), an adversary can only inject (log2(10MB)-log2(200*500))*100 ~= 1000
57 * cells of fake XOFF/XON before the xmit byte count will be halved enough to
58 * triggering a limit. */
59 #define XON_COUNT_SCALE_AT 200
60 #define XOFF_COUNT_SCALE_AT 200
61 #define ONE_MEGABYTE (UINT64_C(1) << 20)
62 #define TOTAL_XMIT_SCALE_AT (10 * ONE_MEGABYTE)
65 * Update global congestion control related consensus parameter values, every
68 * More details for each of the parameters can be found in proposal 324,
69 * section 6.5 including tuning notes.
72 flow_control_new_consensus_params(const networkstatus_t
*ns
)
74 #define CC_XOFF_CLIENT_DFLT 500
75 #define CC_XOFF_CLIENT_MIN 1
76 #define CC_XOFF_CLIENT_MAX 10000
77 xoff_client
= networkstatus_get_param(ns
, "cc_xoff_client",
80 CC_XOFF_CLIENT_MAX
)*RELAY_PAYLOAD_SIZE
;
82 #define CC_XOFF_EXIT_DFLT 500
83 #define CC_XOFF_EXIT_MIN 1
84 #define CC_XOFF_EXIT_MAX 10000
85 xoff_exit
= networkstatus_get_param(ns
, "cc_xoff_exit",
88 CC_XOFF_EXIT_MAX
)*RELAY_PAYLOAD_SIZE
;
90 #define CC_XON_CHANGE_PCT_DFLT 25
91 #define CC_XON_CHANGE_PCT_MIN 1
92 #define CC_XON_CHANGE_PCT_MAX 99
93 xon_change_pct
= networkstatus_get_param(ns
, "cc_xon_change_pct",
94 CC_XON_CHANGE_PCT_DFLT
,
95 CC_XON_CHANGE_PCT_MIN
,
96 CC_XON_CHANGE_PCT_MAX
);
98 #define CC_XON_RATE_BYTES_DFLT (500)
99 #define CC_XON_RATE_BYTES_MIN (1)
100 #define CC_XON_RATE_BYTES_MAX (5000)
101 xon_rate_bytes
= networkstatus_get_param(ns
, "cc_xon_rate",
102 CC_XON_RATE_BYTES_DFLT
,
103 CC_XON_RATE_BYTES_MIN
,
104 CC_XON_RATE_BYTES_MAX
)*RELAY_PAYLOAD_SIZE
;
106 #define CC_XON_EWMA_CNT_DFLT (2)
107 #define CC_XON_EWMA_CNT_MIN (2)
108 #define CC_XON_EWMA_CNT_MAX (100)
109 xon_ewma_cnt
= networkstatus_get_param(ns
, "cc_xon_ewma_cnt",
110 CC_XON_EWMA_CNT_DFLT
,
112 CC_XON_EWMA_CNT_MAX
);
116 * Send an XOFF for this stream, and note that we sent one
119 circuit_send_stream_xoff(edge_connection_t
*stream
)
122 uint8_t payload
[CELL_PAYLOAD_SIZE
];
125 memset(&xoff
, 0, sizeof(xoff
));
126 memset(payload
, 0, sizeof(payload
));
128 xoff_cell_set_version(&xoff
, 0);
130 if ((xoff_size
= xoff_cell_encode(payload
, CELL_PAYLOAD_SIZE
, &xoff
)) < 0) {
131 log_warn(LD_BUG
, "Failed to encode xon cell");
135 if (connection_edge_send_command(stream
, RELAY_COMMAND_XOFF
,
136 (char*)payload
, (size_t)xoff_size
) == 0) {
137 stream
->xoff_sent
= true;
138 cc_stats_flow_num_xoff_sent
++;
140 /* If this is an entry conn, notify control port */
141 if (TO_CONN(stream
)->type
== CONN_TYPE_AP
) {
142 control_event_stream_status(TO_ENTRY_CONN(TO_CONN(stream
)),
143 STREAM_EVENT_XOFF_SENT
,
150 * Compute the recent drain rate (write rate) for this edge
151 * connection and return it, in KB/sec (1000 bytes/sec).
153 * Returns 0 if the monotime clock is busted.
155 static inline uint32_t
156 compute_drain_rate(const edge_connection_t
*stream
)
158 if (BUG(!is_monotime_clock_reliable())) {
159 log_warn(LD_BUG
, "Computing drain rate with stalled monotime clock");
163 uint64_t delta
= monotime_absolute_usec() - stream
->drain_start_usec
;
166 log_warn(LD_BUG
, "Computing stream drain rate with zero time delta");
170 /* Overflow checks */
171 if (stream
->prev_drained_bytes
> INT32_MAX
/1000 || /* Intermediate */
172 stream
->prev_drained_bytes
/delta
> INT32_MAX
/1000) { /* full value */
176 /* kb/sec = bytes/usec * 1000 usec/msec * 1000 msec/sec * kb/1000bytes */
177 return MAX(1, (uint32_t)(stream
->prev_drained_bytes
* 1000)/delta
);
181 * Send an XON for this stream, with appropriate advisory rate information.
183 * Reverts the xoff sent status, and stores the rate information we sent,
184 * in case it changes.
187 circuit_send_stream_xon(edge_connection_t
*stream
)
190 uint8_t payload
[CELL_PAYLOAD_SIZE
];
193 memset(&xon
, 0, sizeof(xon
));
194 memset(payload
, 0, sizeof(payload
));
196 xon_cell_set_version(&xon
, 0);
197 xon_cell_set_kbps_ewma(&xon
, stream
->ewma_drain_rate
);
199 if ((xon_size
= xon_cell_encode(payload
, CELL_PAYLOAD_SIZE
, &xon
)) < 0) {
200 log_warn(LD_BUG
, "Failed to encode xon cell");
204 /* Store the advisory rate information, to send advisory updates if
206 stream
->ewma_rate_last_sent
= stream
->ewma_drain_rate
;
208 if (connection_edge_send_command(stream
, RELAY_COMMAND_XON
, (char*)payload
,
209 (size_t)xon_size
) == 0) {
210 /* Revert the xoff sent status, so we can send another one if need be */
211 stream
->xoff_sent
= false;
213 cc_stats_flow_num_xon_sent
++;
215 /* If it's an entry conn, notify control port */
216 if (TO_CONN(stream
)->type
== CONN_TYPE_AP
) {
217 control_event_stream_status(TO_ENTRY_CONN(TO_CONN(stream
)),
218 STREAM_EVENT_XON_SENT
,
225 * Process a stream XOFF, parsing it, and then stopping reading on
226 * the edge connection.
228 * Record that we have received an xoff, so we know not to resume
229 * reading on this edge conn until we get an XON.
231 * Returns false if the XOFF did not validate; true if it does.
234 circuit_process_stream_xoff(edge_connection_t
*conn
,
235 const crypt_path_t
*layer_hint
,
242 log_fn(LOG_PROTOCOL_WARN
, LD_EDGE
,
243 "Got XOFF on invalid stream?");
247 /* Make sure this XOFF came from the right hop */
248 if (!edge_uses_cpath(conn
, layer_hint
)) {
249 log_fn(LOG_PROTOCOL_WARN
, LD_EDGE
,
250 "Got XOFF from wrong hop.");
254 if (!edge_uses_flow_control(conn
)) {
255 log_fn(LOG_PROTOCOL_WARN
, LD_EDGE
,
256 "Got XOFF for non-congestion control circuit");
260 if (conn
->xoff_received
) {
261 log_fn(LOG_PROTOCOL_WARN
, LD_EDGE
,
262 "Got multiple XOFF on connection");
266 /* If we are near the max, scale everything down */
267 if (conn
->num_xoff_recv
== XOFF_COUNT_SCALE_AT
) {
268 log_info(LD_EDGE
, "Scaling down for XOFF count: %d %d %d",
269 conn
->total_bytes_xmit
,
272 conn
->total_bytes_xmit
/= 2;
273 conn
->num_xoff_recv
/= 2;
274 conn
->num_xon_recv
/= 2;
277 conn
->num_xoff_recv
++;
279 /* Client-side check to make sure that XOFF is not sent too early,
280 * for dropmark attacks. The main sidechannel risk is early cells,
281 * but we also check to make sure that we have not received more XOFFs
282 * than could have been generated by the bytes we sent.
284 if (TO_CONN(conn
)->type
== CONN_TYPE_AP
|| conn
->hs_ident
!= NULL
) {
291 if (conn
->total_bytes_xmit
< limit
*conn
->num_xoff_recv
) {
292 log_fn(LOG_PROTOCOL_WARN
, LD_EDGE
,
293 "Got extra XOFF for bytes sent. Got %d, expected max %d",
294 conn
->num_xoff_recv
, conn
->total_bytes_xmit
/limit
);
295 /* We still process this, because the only dropmark defenses
296 * in C tor are via the vanguards addon's use of the read valid
297 * cells. So just signal that we think this is not valid protocol
298 * data and proceed. */
303 log_info(LD_EDGE
, "Got XOFF!");
304 connection_stop_reading(TO_CONN(conn
));
305 conn
->xoff_received
= true;
307 /* If this is an entry conn, notify control port */
308 if (TO_CONN(conn
)->type
== CONN_TYPE_AP
) {
309 control_event_stream_status(TO_ENTRY_CONN(TO_CONN(conn
)),
310 STREAM_EVENT_XOFF_RECV
,
318 * Process a stream XON, and if it validates, clear the xoff
319 * flag and resume reading on this edge connection.
321 * Also, use provided rate information to rate limit
322 * reading on this edge (or packagaing from it onto
323 * the circuit), to avoid XON/XOFF chatter.
325 * Returns true if the XON validates, false otherwise.
328 circuit_process_stream_xon(edge_connection_t
*conn
,
329 const crypt_path_t
*layer_hint
,
336 log_fn(LOG_PROTOCOL_WARN
, LD_EDGE
,
337 "Got XON on invalid stream?");
341 /* Make sure this XON came from the right hop */
342 if (!edge_uses_cpath(conn
, layer_hint
)) {
343 log_fn(LOG_PROTOCOL_WARN
, LD_EDGE
,
344 "Got XON from wrong hop.");
348 if (!edge_uses_flow_control(conn
)) {
349 log_fn(LOG_PROTOCOL_WARN
, LD_EDGE
,
350 "Got XON for non-congestion control circuit");
354 if (xon_cell_parse(&xon
, cell
->payload
+RELAY_HEADER_SIZE
,
355 CELL_PAYLOAD_SIZE
-RELAY_HEADER_SIZE
) < 0) {
356 log_fn(LOG_PROTOCOL_WARN
, LD_EDGE
,
357 "Received malformed XON cell.");
361 /* If we are near the max, scale everything down */
362 if (conn
->num_xon_recv
== XON_COUNT_SCALE_AT
) {
363 log_info(LD_EDGE
, "Scaling down for XON count: %d %d %d",
364 conn
->total_bytes_xmit
,
367 conn
->total_bytes_xmit
/= 2;
368 conn
->num_xoff_recv
/= 2;
369 conn
->num_xon_recv
/= 2;
372 conn
->num_xon_recv
++;
374 /* Client-side check to make sure that XON is not sent too early,
375 * for dropmark attacks. The main sidechannel risk is early cells,
376 * but we also check to see that we did not get more XONs than make
377 * sense for the number of bytes we sent.
379 if (TO_CONN(conn
)->type
== CONN_TYPE_AP
|| conn
->hs_ident
!= NULL
) {
383 limit
= MIN(xoff_client
, xon_rate_bytes
);
385 limit
= MIN(xoff_exit
, xon_rate_bytes
);
387 if (conn
->total_bytes_xmit
< limit
*conn
->num_xon_recv
) {
388 log_fn(LOG_PROTOCOL_WARN
, LD_EDGE
,
389 "Got extra XON for bytes sent. Got %d, expected max %d",
390 conn
->num_xon_recv
, conn
->total_bytes_xmit
/limit
);
392 /* We still process this, because the only dropmark defenses
393 * in C tor are via the vanguards addon's use of the read valid
394 * cells. So just signal that we think this is not valid protocol
395 * data and proceed. */
400 log_info(LD_EDGE
, "Got XON: %d", xon
->kbps_ewma
);
402 /* Adjust the token bucket of this edge connection with the drain rate in
403 * the XON. Rate is in bytes from kilobit (kpbs). */
404 uint64_t rate
= ((uint64_t) xon_cell_get_kbps_ewma(xon
) * 1000);
405 if (rate
== 0 || INT32_MAX
< rate
) {
409 token_bucket_rw_adjust(&conn
->bucket
, (uint32_t) rate
, (uint32_t) rate
);
411 if (conn
->xoff_received
) {
412 /* Clear the fact that we got an XOFF, so that this edge can
413 * start and stop reading normally */
414 conn
->xoff_received
= false;
415 connection_start_reading(TO_CONN(conn
));
418 /* If this is an entry conn, notify control port */
419 if (TO_CONN(conn
)->type
== CONN_TYPE_AP
) {
420 control_event_stream_status(TO_ENTRY_CONN(TO_CONN(conn
)),
421 STREAM_EVENT_XON_RECV
,
431 * Called from sendme_stream_data_received(), when data arrives
432 * from a circuit to our edge's outbuf, to decide if we need to send
435 * Returns the amount of cells remaining until the buffer is full, at
436 * which point it sends an XOFF, and returns 0.
438 * Returns less than 0 if we have queued more than a congestion window
439 * worth of data and need to close the circuit.
442 flow_control_decide_xoff(edge_connection_t
*stream
)
444 size_t total_buffered
= connection_get_outbuf_len(TO_CONN(stream
));
445 uint32_t buffer_limit_xoff
= 0;
447 if (BUG(!edge_uses_flow_control(stream
))) {
448 log_err(LD_BUG
, "Flow control called for non-congestion control circuit");
452 /* Onion services and clients are typically localhost edges, so they
453 * need different buffering limits than exits do */
454 if (TO_CONN(stream
)->type
== CONN_TYPE_AP
|| stream
->hs_ident
!= NULL
) {
455 buffer_limit_xoff
= xoff_client
;
457 buffer_limit_xoff
= xoff_exit
;
460 if (total_buffered
> buffer_limit_xoff
) {
461 if (!stream
->xoff_sent
) {
462 log_info(LD_EDGE
, "Sending XOFF: %"TOR_PRIuSZ
" %d",
463 total_buffered
, buffer_limit_xoff
);
464 tor_trace(TR_SUBSYS(cc
), TR_EV(flow_decide_xoff_sending
), stream
);
466 cc_stats_flow_xoff_outbuf_ma
=
467 stats_update_running_avg(cc_stats_flow_xoff_outbuf_ma
,
470 circuit_send_stream_xoff(stream
);
472 /* Clear the drain rate. It is considered wrong if we
473 * got all the way to XOFF */
474 stream
->ewma_drain_rate
= 0;
478 /* If the outbuf has accumulated more than the expected burst limit of
479 * cells, then assume it is not draining, and call decide_xon. We must
480 * do this because writes only happen when the socket unblocks, so
481 * may not otherwise notice accumulation of data in the outbuf for
483 if (total_buffered
> MAX_EXPECTED_CELL_BURST
*RELAY_PAYLOAD_SIZE
) {
484 flow_control_decide_xon(stream
, 0);
487 /* Flow control always takes more data; we rely on the oomkiller to
488 * handle misbehavior. */
493 * Returns true if the stream's drain rate has changed significantly.
495 * Returns false if the monotime clock is stalled, or if we have
496 * no previous drain rate information.
499 stream_drain_rate_changed(const edge_connection_t
*stream
)
501 if (!is_monotime_clock_reliable()) {
505 if (!stream
->ewma_rate_last_sent
) {
509 if (stream
->ewma_drain_rate
>
510 (100+(uint64_t)xon_change_pct
)*stream
->ewma_rate_last_sent
/100) {
514 if (stream
->ewma_drain_rate
<
515 (100-(uint64_t)xon_change_pct
)*stream
->ewma_rate_last_sent
/100) {
523 * Called whenever we drain an edge connection outbuf by writing on
524 * its socket, to decide if it is time to send an xon.
526 * The n_written parameter tells us how many bytes we have written
527 * this time, which is used to compute the advisory drain rate fields.
530 flow_control_decide_xon(edge_connection_t
*stream
, size_t n_written
)
532 size_t total_buffered
= connection_get_outbuf_len(TO_CONN(stream
));
534 /* Bounds check the number of drained bytes, and scale */
535 if (stream
->drained_bytes
>= UINT32_MAX
- n_written
) {
536 /* Cut the bytes in half, and move the start time up halfway to now
537 * (if we have one). */
538 stream
->drained_bytes
/= 2;
540 if (stream
->drain_start_usec
) {
541 uint64_t now
= monotime_absolute_usec();
543 stream
->drain_start_usec
= now
- (now
-stream
->drain_start_usec
)/2;
547 /* Accumulate drained bytes since last rate computation */
548 stream
->drained_bytes
+= n_written
;
550 tor_trace(TR_SUBSYS(cc
), TR_EV(flow_decide_xon
), stream
, n_written
);
552 /* Check for bad monotime clock and bytecount wrap */
553 if (!is_monotime_clock_reliable()) {
554 /* If the monotime clock ever goes wrong, the safest thing to do
555 * is just clear our short-term rate info and wait for the clock to
556 * become reliable again.. */
557 stream
->drain_start_usec
= 0;
558 stream
->drained_bytes
= 0;
560 /* If we have no drain start timestamp, and we still have
561 * remaining buffer, start the buffering counter */
562 if (!stream
->drain_start_usec
&& total_buffered
> 0) {
563 log_debug(LD_EDGE
, "Began edge buffering: %d %d %"TOR_PRIuSZ
,
564 stream
->ewma_rate_last_sent
,
565 stream
->ewma_drain_rate
,
567 tor_trace(TR_SUBSYS(cc
), TR_EV(flow_decide_xon_drain_start
),
569 stream
->drain_start_usec
= monotime_absolute_usec();
570 stream
->drained_bytes
= 0;
574 if (stream
->drain_start_usec
) {
575 /* If we have spent enough time in a queued state, update our drain
577 if (stream
->drained_bytes
> xon_rate_bytes
) {
578 /* No previous drained bytes means it is the first time we are computing
579 * it so use the value we just drained onto the socket as a baseline. It
580 * won't be accurate but it will be a start towards the right value.
582 * We have to do this in order to have a drain rate else we could be
583 * sending a drain rate of 0 in an XON which would be undesirable and
584 * basically like sending an XOFF. */
585 if (stream
->prev_drained_bytes
== 0) {
586 stream
->prev_drained_bytes
= stream
->drained_bytes
;
588 uint32_t drain_rate
= compute_drain_rate(stream
);
589 /* Once the drain rate has been computed, note how many bytes we just
590 * drained so it can be used at the next calculation. We do this here
591 * because it gets reset once the rate is changed. */
592 stream
->prev_drained_bytes
= stream
->drained_bytes
;
595 stream
->ewma_drain_rate
=
596 (uint32_t)n_count_ewma(drain_rate
,
597 stream
->ewma_drain_rate
,
599 log_debug(LD_EDGE
, "Updating drain rate: %d %d %"TOR_PRIuSZ
,
601 stream
->ewma_drain_rate
,
603 tor_trace(TR_SUBSYS(cc
), TR_EV(flow_decide_xon_drain_update
),
605 /* Reset recent byte counts. This prevents us from sending advisory
606 * XONs more frequent than every xon_rate_bytes. */
607 stream
->drained_bytes
= 0;
608 stream
->drain_start_usec
= 0;
613 /* If we don't have an XOFF outstanding, consider updating an
615 if (!stream
->xoff_sent
) {
616 if (stream_drain_rate_changed(stream
)) {
617 /* If we are still buffering and the rate changed, update
619 log_info(LD_EDGE
, "Sending rate-change XON: %d %d %"TOR_PRIuSZ
,
620 stream
->ewma_rate_last_sent
,
621 stream
->ewma_drain_rate
,
623 tor_trace(TR_SUBSYS(cc
), TR_EV(flow_decide_xon_rate_change
), stream
);
625 cc_stats_flow_xon_outbuf_ma
=
626 stats_update_running_avg(cc_stats_flow_xon_outbuf_ma
,
629 circuit_send_stream_xon(stream
);
631 } else if (total_buffered
== 0) {
632 log_info(LD_EDGE
, "Sending XON: %d %d %"TOR_PRIuSZ
,
633 stream
->ewma_rate_last_sent
,
634 stream
->ewma_drain_rate
,
636 tor_trace(TR_SUBSYS(cc
), TR_EV(flow_decide_xon_partial_drain
), stream
);
637 circuit_send_stream_xon(stream
);
640 /* If the buffer has fully emptied, clear the drain timestamp,
641 * so we can total only bytes drained while outbuf is 0. */
642 if (total_buffered
== 0) {
643 stream
->drain_start_usec
= 0;
645 /* After we've spent 'xon_rate_bytes' with the queue fully drained,
646 * double any rate we sent. */
647 if (stream
->drained_bytes
>= xon_rate_bytes
&&
648 stream
->ewma_rate_last_sent
) {
649 stream
->ewma_drain_rate
= MIN(INT32_MAX
, 2*stream
->ewma_drain_rate
);
652 "Queue empty for xon_rate_limit bytes: %d %d",
653 stream
->ewma_rate_last_sent
,
654 stream
->ewma_drain_rate
);
655 tor_trace(TR_SUBSYS(cc
), TR_EV(flow_decide_xon_drain_doubled
), stream
);
656 /* Resetting the drained bytes count. We need to keep its value as a
657 * previous so the drain rate calculation takes into account what was
658 * actually drain the last time. */
659 stream
->prev_drained_bytes
= stream
->drained_bytes
;
660 stream
->drained_bytes
= 0;
668 * Note that we packaged some data on this stream. Used to enforce
669 * client-side dropmark limits
672 flow_control_note_sent_data(edge_connection_t
*stream
, size_t len
)
674 /* If we are near the max, scale everything down */
675 if (stream
->total_bytes_xmit
>= TOTAL_XMIT_SCALE_AT
-len
) {
676 log_info(LD_EDGE
, "Scaling down for flow control xmit bytes:: %d %d %d",
677 stream
->total_bytes_xmit
,
678 stream
->num_xoff_recv
,
679 stream
->num_xon_recv
);
681 stream
->total_bytes_xmit
/= 2;
682 stream
->num_xoff_recv
/= 2;
683 stream
->num_xon_recv
/= 2;
686 stream
->total_bytes_xmit
+= len
;
689 /** Returns true if an edge connection uses flow control */
691 edge_uses_flow_control(const edge_connection_t
*stream
)
693 bool ret
= (stream
->on_circuit
&& stream
->on_circuit
->ccontrol
) ||
694 (stream
->cpath_layer
&& stream
->cpath_layer
->ccontrol
);
696 /* All circuits with congestion control use flow control */
700 /** Returns true if a connection is an edge conn that uses flow control */
702 conn_uses_flow_control(connection_t
*conn
)
706 if (CONN_IS_EDGE(conn
)) {
707 edge_connection_t
*edge
= TO_EDGE_CONN(conn
);
709 if (edge_uses_flow_control(edge
)) {