1 -- Because we deal with pre-authed sessions and streams we can't be host-specific
4 local filters
= require
"util.filters";
5 local throttle
= require
"util.throttle";
6 local timer
= require
"util.timer";
7 local ceil = math
.ceil;
9 local limits_cfg
= module
:get_option("limits", {});
10 local limits_resolution
= module
:get_option_number("limits_resolution", 1);
12 local default_bytes_per_second
= 3000;
13 local default_burst
= 2;
15 local rate_units
= { b
= 1, k
= 3, m
= 6, g
= 9, t
= 12 } -- Plan for the future.
16 local function parse_rate(rate
, sess_type
)
17 local quantity
, unit
, exp;
19 quantity
, unit
= rate
:match("^(%d+) ?([^/]+)/s$");
20 exp = quantity
and rate_units
[unit
:sub(1,1):lower()];
23 module
:log("error", "Error parsing rate for %s: %q, using default rate (%d bytes/s)", sess_type
, rate
, default_bytes_per_second
);
24 return default_bytes_per_second
;
26 return quantity
*(10^
exp);
29 local function parse_burst(burst
, sess_type
)
30 if type(burst
) == "string" then
31 burst
= burst
:match("^(%d+) ?s$");
33 local n_burst
= tonumber(burst
);
35 module
:log("error", "Unable to parse burst for %s: %q, using default burst interval (%ds)", sess_type
, tostring(burst
), default_burst
);
37 return n_burst
or default_burst
;
40 -- Process config option into limits table:
41 -- limits = { c2s = { bytes_per_second = X, burst_seconds = Y } }
44 for sess_type
, sess_limits
in pairs(limits_cfg
) do
46 bytes_per_second
= parse_rate(sess_limits
.rate
, sess_type
);
47 burst_seconds
= parse_burst(sess_limits
.burst
, sess_type
);
51 local default_filter_set
= {};
53 function default_filter_set
.bytes_in(bytes
, session
)
54 local sess_throttle
= session
.throttle
;
56 local ok
, balance
, outstanding
= sess_throttle
:poll(#bytes
, true);
58 session
.log("debug", "Session over rate limit (%d) with %d (by %d), pausing", sess_throttle
.max, #bytes
, outstanding
);
59 outstanding
= ceil(outstanding
);
60 session
.conn
:pause(); -- Read no more data from the connection until there is no outstanding data
61 local outstanding_data
= bytes
:sub(-outstanding
);
62 bytes
= bytes
:sub(1, #bytes
-outstanding
);
63 timer
.add_task(limits_resolution
, function ()
64 if not session
.conn
then return; end
65 if sess_throttle
:peek(#outstanding_data
) then
66 session
.log("debug", "Resuming paused session");
67 session
.conn
:resume();
69 -- Handle what we can of the outstanding data
70 session
.data(outstanding_data
);
77 local type_filters
= {
78 c2s
= default_filter_set
;
79 s2sin
= default_filter_set
;
80 s2sout
= default_filter_set
;
83 local function filter_hook(session
)
84 local session_type
= session
.type:match("^[^_]+");
85 local filter_set
, opts
= type_filters
[session_type
], limits
[session_type
];
87 session
.throttle
= throttle
.create(opts
.bytes_per_second
* opts
.burst_seconds
, opts
.burst_seconds
);
88 filters
.add_filter(session
, "bytes/in", filter_set
.bytes_in
, 1000);
92 function module
.load()
93 filters
.add_filter_hook(filter_hook
);
96 function module
.unload()
97 filters
.remove_filter_hook(filter_hook
);