1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim:set ts=2 sw=2 sts=2 et cindent: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "EventTokenBucket.h"
9 #include "nsICancelable.h"
10 #include "nsIIOService.h"
12 #include "nsNetUtil.h"
13 #include "nsServiceManagerUtils.h"
14 #include "nsSocketTransportService2.h"
16 #include "mozilla/Components.h"
19 # include "MainThreadUtils.h"
24 # include <mmsystem.h>
30 ////////////////////////////////////////////
31 // EventTokenBucketCancelable
32 ////////////////////////////////////////////
34 class TokenBucketCancelable
: public nsICancelable
{
36 NS_DECL_THREADSAFE_ISUPPORTS
39 explicit TokenBucketCancelable(class ATokenBucketEvent
* event
);
43 virtual ~TokenBucketCancelable() = default;
45 friend class EventTokenBucket
;
46 ATokenBucketEvent
* mEvent
;
49 NS_IMPL_ISUPPORTS(TokenBucketCancelable
, nsICancelable
)
51 TokenBucketCancelable::TokenBucketCancelable(ATokenBucketEvent
* event
)
55 TokenBucketCancelable::Cancel(nsresult reason
) {
56 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
61 void TokenBucketCancelable::Fire() {
64 ATokenBucketEvent
* event
= mEvent
;
66 event
->OnTokenBucketAdmitted();
69 ////////////////////////////////////////////
71 ////////////////////////////////////////////
73 NS_IMPL_ISUPPORTS(EventTokenBucket
, nsITimerCallback
, nsINamed
)
75 // by default 1hz with no burst
76 EventTokenBucket::EventTokenBucket(uint32_t eventsPerSecond
, uint32_t burstSize
)
77 : mUnitCost(kUsecPerSec
),
78 mMaxCredit(kUsecPerSec
),
85 mFineGrainTimerInUse(false),
86 mFineGrainResetTimerArmed(false)
89 mLastUpdate
= TimeStamp::Now();
91 MOZ_ASSERT(NS_IsMainThread());
94 nsCOMPtr
<nsIEventTarget
> sts
;
95 nsCOMPtr
<nsIIOService
> ioService
= do_GetIOService(&rv
);
96 if (NS_SUCCEEDED(rv
)) {
97 sts
= mozilla::components::SocketTransport::Service(&rv
);
99 if (NS_SUCCEEDED(rv
)) mTimer
= NS_NewTimer(sts
);
100 SetRate(eventsPerSecond
, burstSize
);
103 EventTokenBucket::~EventTokenBucket() {
105 ("EventTokenBucket::dtor %p events=%zu\n", this, mEvents
.GetSize()));
109 // Complete any queued events to prevent hangs
110 while (mEvents
.GetSize()) {
111 RefPtr
<TokenBucketCancelable
> cancelable
= mEvents
.PopFront();
116 void EventTokenBucket::CleanupTimers() {
117 if (mTimer
&& mTimerArmed
) {
125 if (mFineGrainResetTimer
&& mFineGrainResetTimerArmed
) {
126 mFineGrainResetTimer
->Cancel();
128 mFineGrainResetTimer
= nullptr;
129 mFineGrainResetTimerArmed
= false;
133 void EventTokenBucket::SetRate(uint32_t eventsPerSecond
, uint32_t burstSize
) {
134 SOCKET_LOG(("EventTokenBucket::SetRate %p %u %u\n", this, eventsPerSecond
,
137 if (eventsPerSecond
> kMaxHz
) {
138 eventsPerSecond
= kMaxHz
;
139 SOCKET_LOG((" eventsPerSecond out of range\n"));
142 if (!eventsPerSecond
) {
144 SOCKET_LOG((" eventsPerSecond out of range\n"));
147 mUnitCost
= kUsecPerSec
/ eventsPerSecond
;
148 mMaxCredit
= mUnitCost
* burstSize
;
149 if (mMaxCredit
> kUsecPerSec
* 60 * 15) {
150 SOCKET_LOG((" burstSize out of range\n"));
151 mMaxCredit
= kUsecPerSec
* 60 * 15;
153 mCredit
= mMaxCredit
;
154 mLastUpdate
= TimeStamp::Now();
157 void EventTokenBucket::ClearCredits() {
158 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
159 SOCKET_LOG(("EventTokenBucket::ClearCredits %p\n", this));
163 uint32_t EventTokenBucket::BurstEventsAvailable() {
164 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
165 return static_cast<uint32_t>(mCredit
/ mUnitCost
);
168 uint32_t EventTokenBucket::QueuedEvents() {
169 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
170 return mEvents
.GetSize();
173 void EventTokenBucket::Pause() {
174 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
175 SOCKET_LOG(("EventTokenBucket::Pause %p\n", this));
176 if (mPaused
|| mStopped
) return;
185 void EventTokenBucket::UnPause() {
186 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
187 SOCKET_LOG(("EventTokenBucket::UnPause %p\n", this));
188 if (!mPaused
|| mStopped
) return;
195 void EventTokenBucket::Stop() {
196 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
197 SOCKET_LOG(("EventTokenBucket::Stop %p armed=%d\n", this, mTimerArmed
));
201 // Complete any queued events to prevent hangs
202 while (mEvents
.GetSize()) {
203 RefPtr
<TokenBucketCancelable
> cancelable
= mEvents
.PopFront();
208 nsresult
EventTokenBucket::SubmitEvent(ATokenBucketEvent
* event
,
209 nsICancelable
** cancelable
) {
210 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
211 SOCKET_LOG(("EventTokenBucket::SubmitEvent %p\n", this));
213 if (mStopped
|| !mTimer
) return NS_ERROR_FAILURE
;
217 RefPtr
<TokenBucketCancelable
> cancelEvent
= new TokenBucketCancelable(event
);
218 // When this function exits the cancelEvent needs 2 references, one for the
219 // mEvents queue and one for the caller of SubmitEvent()
221 *cancelable
= do_AddRef(cancelEvent
).take();
223 if (mPaused
|| !TryImmediateDispatch(cancelEvent
.get())) {
225 SOCKET_LOG((" queued\n"));
226 mEvents
.Push(cancelEvent
.forget());
229 SOCKET_LOG((" dispatched synchronously\n"));
235 bool EventTokenBucket::TryImmediateDispatch(TokenBucketCancelable
* cancelable
) {
236 if (mCredit
< mUnitCost
) return false;
238 mCredit
-= mUnitCost
;
243 void EventTokenBucket::DispatchEvents() {
244 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
245 SOCKET_LOG(("EventTokenBucket::DispatchEvents %p %d\n", this, mPaused
));
246 if (mPaused
|| mStopped
) return;
248 while (mEvents
.GetSize() && mUnitCost
<= mCredit
) {
249 RefPtr
<TokenBucketCancelable
> cancelable
= mEvents
.PopFront();
250 if (cancelable
->mEvent
) {
252 ("EventTokenBucket::DispachEvents [%p] "
253 "Dispatching queue token bucket event cost=%" PRIu64
254 " credit=%" PRIu64
"\n",
255 this, mUnitCost
, mCredit
));
256 mCredit
-= mUnitCost
;
262 if (!mEvents
.GetSize()) WantNormalTimers();
266 void EventTokenBucket::UpdateTimer() {
267 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
268 if (mTimerArmed
|| mPaused
|| mStopped
|| !mEvents
.GetSize() || !mTimer
) {
272 if (mCredit
>= mUnitCost
) return;
274 // determine the time needed to wait to accumulate enough credits to admit
275 // one more event and set the timer for that point. Always round it
276 // up because firing early doesn't help.
278 uint64_t deficit
= mUnitCost
- mCredit
;
279 uint64_t msecWait
= (deficit
+ (kUsecPerMsec
- 1)) / kUsecPerMsec
;
281 if (msecWait
< 4) { // minimum wait
283 } else if (msecWait
> 60000) { // maximum wait
292 ("EventTokenBucket::UpdateTimer %p for %" PRIu64
"ms\n", this, msecWait
));
293 nsresult rv
= mTimer
->InitWithCallback(this, static_cast<uint32_t>(msecWait
),
294 nsITimer::TYPE_ONE_SHOT
);
295 mTimerArmed
= NS_SUCCEEDED(rv
);
299 EventTokenBucket::Notify(nsITimer
* timer
) {
300 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
303 if (timer
== mFineGrainResetTimer
) {
304 FineGrainResetTimerNotify();
309 SOCKET_LOG(("EventTokenBucket::Notify() %p\n", this));
311 if (mStopped
) return NS_OK
;
321 EventTokenBucket::GetName(nsACString
& aName
) {
322 aName
.AssignLiteral("EventTokenBucket");
326 void EventTokenBucket::UpdateCredits() {
327 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
329 TimeStamp now
= TimeStamp::Now();
330 TimeDuration elapsed
= now
- mLastUpdate
;
333 mCredit
+= static_cast<uint64_t>(elapsed
.ToMicroseconds());
334 if (mCredit
> mMaxCredit
) mCredit
= mMaxCredit
;
335 SOCKET_LOG(("EventTokenBucket::UpdateCredits %p to %" PRIu64
" (%" PRIu64
337 this, mCredit
, mUnitCost
, (double)mCredit
/ mUnitCost
));
341 void EventTokenBucket::FineGrainTimers() {
342 SOCKET_LOG(("EventTokenBucket::FineGrainTimers %p mFineGrainTimerInUse=%d\n",
343 this, mFineGrainTimerInUse
));
345 mLastFineGrainTimerUse
= TimeStamp::Now();
347 if (mFineGrainTimerInUse
) return;
349 if (mUnitCost
> kCostFineGrainThreshold
) return;
352 ("EventTokenBucket::FineGrainTimers %p timeBeginPeriod()\n", this));
354 mFineGrainTimerInUse
= true;
358 void EventTokenBucket::NormalTimers() {
359 if (!mFineGrainTimerInUse
) return;
360 mFineGrainTimerInUse
= false;
362 SOCKET_LOG(("EventTokenBucket::NormalTimers %p timeEndPeriod()\n", this));
366 void EventTokenBucket::WantNormalTimers() {
367 if (!mFineGrainTimerInUse
) return;
368 if (mFineGrainResetTimerArmed
) return;
370 TimeDuration
elapsed(TimeStamp::Now() - mLastFineGrainTimerUse
);
371 static const TimeDuration fiveSeconds
= TimeDuration::FromSeconds(5);
373 if (elapsed
>= fiveSeconds
) {
378 if (!mFineGrainResetTimer
) mFineGrainResetTimer
= NS_NewTimer();
380 // if we can't delay the reset, just do it now
381 if (!mFineGrainResetTimer
) {
386 // pad the callback out 100ms to avoid having to round trip this again if the
387 // timer calls back just a tad early.
389 ("EventTokenBucket::WantNormalTimers %p "
390 "Will reset timer granularity after delay",
393 mFineGrainResetTimer
->InitWithCallback(
395 static_cast<uint32_t>((fiveSeconds
- elapsed
).ToMilliseconds()) + 100,
396 nsITimer::TYPE_ONE_SHOT
);
397 mFineGrainResetTimerArmed
= true;
400 void EventTokenBucket::FineGrainResetTimerNotify() {
401 SOCKET_LOG(("EventTokenBucket::FineGrainResetTimerNotify(%p) events = %zd\n",
402 this, mEvents
.GetSize()));
403 mFineGrainResetTimerArmed
= false;
405 // If we are currently processing events then wait for the queue to drain
406 // before trying to reset back to normal timers again
407 if (!mEvents
.GetSize()) WantNormalTimers();
413 } // namespace mozilla