1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "ThrottleQueue.h"
8 #include "mozilla/Components.h"
9 #include "mozilla/net/InputChannelThrottleQueueParent.h"
10 #include "nsISeekableStream.h"
11 #include "nsIAsyncInputStream.h"
12 #include "nsIOService.h"
13 #include "nsSocketTransportService2.h"
14 #include "nsStreamUtils.h"
15 #include "nsNetUtil.h"
20 //-----------------------------------------------------------------------------
22 class ThrottleInputStream final
: public nsIAsyncInputStream
,
23 public nsISeekableStream
{
25 ThrottleInputStream(nsIInputStream
* aStream
, ThrottleQueue
* aQueue
);
27 NS_DECL_THREADSAFE_ISUPPORTS
28 NS_DECL_NSIINPUTSTREAM
29 NS_DECL_NSISEEKABLESTREAM
30 NS_DECL_NSITELLABLESTREAM
31 NS_DECL_NSIASYNCINPUTSTREAM
36 ~ThrottleInputStream();
38 nsCOMPtr
<nsIInputStream
> mStream
;
39 RefPtr
<ThrottleQueue
> mQueue
;
40 nsresult mClosedStatus
;
42 nsCOMPtr
<nsIInputStreamCallback
> mCallback
;
43 nsCOMPtr
<nsIEventTarget
> mEventTarget
;
46 NS_IMPL_ISUPPORTS(ThrottleInputStream
, nsIAsyncInputStream
, nsIInputStream
,
47 nsITellableStream
, nsISeekableStream
)
49 ThrottleInputStream::ThrottleInputStream(nsIInputStream
* aStream
,
50 ThrottleQueue
* aQueue
)
51 : mStream(aStream
), mQueue(aQueue
), mClosedStatus(NS_OK
) {
52 MOZ_ASSERT(aQueue
!= nullptr);
55 ThrottleInputStream::~ThrottleInputStream() { Close(); }
58 ThrottleInputStream::Close() {
59 if (NS_FAILED(mClosedStatus
)) {
64 mQueue
->DequeueStream(this);
66 mClosedStatus
= NS_BASE_STREAM_CLOSED
;
68 return mStream
->Close();
72 ThrottleInputStream::Available(uint64_t* aResult
) {
73 if (NS_FAILED(mClosedStatus
)) {
77 return mStream
->Available(aResult
);
81 ThrottleInputStream::StreamStatus() {
82 if (NS_FAILED(mClosedStatus
)) {
86 return mStream
->StreamStatus();
90 ThrottleInputStream::Read(char* aBuf
, uint32_t aCount
, uint32_t* aResult
) {
91 if (NS_FAILED(mClosedStatus
)) {
96 nsresult rv
= mQueue
->Available(aCount
, &realCount
);
101 if (realCount
== 0) {
102 return NS_BASE_STREAM_WOULD_BLOCK
;
105 rv
= mStream
->Read(aBuf
, realCount
, aResult
);
106 if (NS_SUCCEEDED(rv
) && *aResult
> 0) {
107 mQueue
->RecordRead(*aResult
);
113 ThrottleInputStream::ReadSegments(nsWriteSegmentFun aWriter
, void* aClosure
,
114 uint32_t aCount
, uint32_t* aResult
) {
115 if (NS_FAILED(mClosedStatus
)) {
116 return mClosedStatus
;
120 nsresult rv
= mQueue
->Available(aCount
, &realCount
);
124 MOZ_ASSERT(realCount
<= aCount
);
126 if (realCount
== 0) {
127 return NS_BASE_STREAM_WOULD_BLOCK
;
130 rv
= mStream
->ReadSegments(aWriter
, aClosure
, realCount
, aResult
);
131 if (NS_SUCCEEDED(rv
) && *aResult
> 0) {
132 mQueue
->RecordRead(*aResult
);
138 ThrottleInputStream::IsNonBlocking(bool* aNonBlocking
) {
139 *aNonBlocking
= true;
144 ThrottleInputStream::Seek(int32_t aWhence
, int64_t aOffset
) {
145 if (NS_FAILED(mClosedStatus
)) {
146 return mClosedStatus
;
149 nsCOMPtr
<nsISeekableStream
> sstream
= do_QueryInterface(mStream
);
151 return NS_ERROR_FAILURE
;
154 return sstream
->Seek(aWhence
, aOffset
);
158 ThrottleInputStream::Tell(int64_t* aResult
) {
159 if (NS_FAILED(mClosedStatus
)) {
160 return mClosedStatus
;
163 nsCOMPtr
<nsITellableStream
> sstream
= do_QueryInterface(mStream
);
165 return NS_ERROR_FAILURE
;
168 return sstream
->Tell(aResult
);
172 ThrottleInputStream::SetEOF() {
173 if (NS_FAILED(mClosedStatus
)) {
174 return mClosedStatus
;
177 nsCOMPtr
<nsISeekableStream
> sstream
= do_QueryInterface(mStream
);
179 return NS_ERROR_FAILURE
;
182 return sstream
->SetEOF();
186 ThrottleInputStream::CloseWithStatus(nsresult aStatus
) {
187 if (NS_FAILED(mClosedStatus
)) {
188 // Already closed, ignore.
191 if (NS_SUCCEEDED(aStatus
)) {
192 aStatus
= NS_BASE_STREAM_CLOSED
;
195 mClosedStatus
= Close();
196 if (NS_SUCCEEDED(mClosedStatus
)) {
197 mClosedStatus
= aStatus
;
203 ThrottleInputStream::AsyncWait(nsIInputStreamCallback
* aCallback
,
204 uint32_t aFlags
, uint32_t aRequestedCount
,
205 nsIEventTarget
* aEventTarget
) {
207 return NS_ERROR_ILLEGAL_VALUE
;
210 mCallback
= aCallback
;
211 mEventTarget
= aEventTarget
;
213 mQueue
->QueueStream(this);
215 mQueue
->DequeueStream(this);
220 void ThrottleInputStream::AllowInput() {
221 MOZ_ASSERT(mCallback
);
222 nsCOMPtr
<nsIInputStreamCallback
> callbackEvent
= NS_NewInputStreamReadyEvent(
223 "ThrottleInputStream::AllowInput", mCallback
, mEventTarget
);
225 mEventTarget
= nullptr;
226 callbackEvent
->OnInputStreamReady(this);
229 //-----------------------------------------------------------------------------
232 already_AddRefed
<nsIInputChannelThrottleQueue
> ThrottleQueue::Create() {
233 MOZ_ASSERT(XRE_IsParentProcess());
235 nsCOMPtr
<nsIInputChannelThrottleQueue
> tq
;
236 if (nsIOService::UseSocketProcess()) {
237 tq
= new InputChannelThrottleQueueParent();
239 tq
= new ThrottleQueue();
245 NS_IMPL_ISUPPORTS(ThrottleQueue
, nsIInputChannelThrottleQueue
, nsITimerCallback
,
248 ThrottleQueue::ThrottleQueue()
252 nsCOMPtr
<nsIEventTarget
> sts
;
253 nsCOMPtr
<nsIIOService
> ioService
= do_GetIOService(&rv
);
254 if (NS_SUCCEEDED(rv
)) {
255 sts
= mozilla::components::SocketTransport::Service(&rv
);
257 if (NS_SUCCEEDED(rv
)) mTimer
= NS_NewTimer(sts
);
260 ThrottleQueue::~ThrottleQueue() {
261 if (mTimer
&& mTimerArmed
) {
268 ThrottleQueue::RecordRead(uint32_t aBytesRead
) {
269 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
271 entry
.mTime
= TimeStamp::Now();
272 entry
.mBytesRead
= aBytesRead
;
273 mReadEvents
.AppendElement(entry
);
274 mBytesProcessed
+= aBytesRead
;
279 ThrottleQueue::Available(uint32_t aRemaining
, uint32_t* aAvailable
) {
280 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
281 TimeStamp now
= TimeStamp::Now();
282 TimeStamp oneSecondAgo
= now
- TimeDuration::FromSeconds(1);
285 // Remove all stale events.
286 for (i
= 0; i
< mReadEvents
.Length(); ++i
) {
287 if (mReadEvents
[i
].mTime
>= oneSecondAgo
) {
291 mReadEvents
.RemoveElementsAt(0, i
);
293 uint32_t totalBytes
= 0;
294 for (i
= 0; i
< mReadEvents
.Length(); ++i
) {
295 totalBytes
+= mReadEvents
[i
].mBytesRead
;
298 uint32_t spread
= mMaxBytesPerSecond
- mMeanBytesPerSecond
;
299 double prob
= static_cast<double>(rand()) / RAND_MAX
;
300 uint32_t thisSliceBytes
=
301 mMeanBytesPerSecond
- spread
+ static_cast<uint32_t>(2 * spread
* prob
);
303 if (totalBytes
>= thisSliceBytes
) {
306 *aAvailable
= std::min(thisSliceBytes
, aRemaining
);
312 ThrottleQueue::Init(uint32_t aMeanBytesPerSecond
, uint32_t aMaxBytesPerSecond
) {
313 // Can be called on any thread.
314 if (aMeanBytesPerSecond
== 0 || aMaxBytesPerSecond
== 0 ||
315 aMaxBytesPerSecond
< aMeanBytesPerSecond
) {
316 return NS_ERROR_ILLEGAL_VALUE
;
319 mMeanBytesPerSecond
= aMeanBytesPerSecond
;
320 mMaxBytesPerSecond
= aMaxBytesPerSecond
;
325 ThrottleQueue::BytesProcessed(uint64_t* aResult
) {
326 *aResult
= mBytesProcessed
;
331 ThrottleQueue::WrapStream(nsIInputStream
* aInputStream
,
332 nsIAsyncInputStream
** aResult
) {
333 nsCOMPtr
<nsIAsyncInputStream
> result
=
334 new ThrottleInputStream(aInputStream
, this);
335 result
.forget(aResult
);
340 ThrottleQueue::Notify(nsITimer
* aTimer
) {
341 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
342 // A notified reader may need to push itself back on the queue.
343 // Swap out the list of readers so that this works properly.
344 nsTArray
<RefPtr
<ThrottleInputStream
>> events
= std::move(mAsyncEvents
);
346 // Optimistically notify all the waiting readers, and then let them
347 // requeue if there isn't enough bandwidth.
348 for (size_t i
= 0; i
< events
.Length(); ++i
) {
349 events
[i
]->AllowInput();
357 ThrottleQueue::GetName(nsACString
& aName
) {
358 aName
.AssignLiteral("net::ThrottleQueue");
362 void ThrottleQueue::QueueStream(ThrottleInputStream
* aStream
) {
363 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
364 if (mAsyncEvents
.IndexOf(aStream
) ==
365 nsTArray
<RefPtr
<mozilla::net::ThrottleInputStream
>>::NoIndex
) {
366 mAsyncEvents
.AppendElement(aStream
);
370 if (mReadEvents
.Length() > 0) {
371 TimeStamp t
= mReadEvents
[0].mTime
+ TimeDuration::FromSeconds(1);
372 TimeStamp now
= TimeStamp::Now();
375 ms
= static_cast<uint32_t>((t
- now
).ToMilliseconds());
382 mTimer
->InitWithCallback(this, ms
, nsITimer::TYPE_ONE_SHOT
))) {
389 void ThrottleQueue::DequeueStream(ThrottleInputStream
* aStream
) {
390 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
391 mAsyncEvents
.RemoveElement(aStream
);
395 ThrottleQueue::GetMeanBytesPerSecond(uint32_t* aMeanBytesPerSecond
) {
396 NS_ENSURE_ARG(aMeanBytesPerSecond
);
398 *aMeanBytesPerSecond
= mMeanBytesPerSecond
;
403 ThrottleQueue::GetMaxBytesPerSecond(uint32_t* aMaxBytesPerSecond
) {
404 NS_ENSURE_ARG(aMaxBytesPerSecond
);
406 *aMaxBytesPerSecond
= mMaxBytesPerSecond
;
411 } // namespace mozilla