Bug 470455 - test_database_sync_embed_visits.js leaks, r=sdwilsh
[wine-gecko.git] / netwerk / base / src / nsInputStreamPump.cpp
blob381f86871d0f3c1699e60971a8677ed3a525abe9
1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /* vim:set ts=4 sts=4 sw=4 et cin: */
3 /* ***** BEGIN LICENSE BLOCK *****
4 * Version: MPL 1.1/GPL 2.0/LGPL 2.1
6 * The contents of this file are subject to the Mozilla Public License Version
7 * 1.1 (the "License"); you may not use this file except in compliance with
8 * the License. You may obtain a copy of the License at
9 * http://www.mozilla.org/MPL/
11 * Software distributed under the License is distributed on an "AS IS" basis,
12 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13 * for the specific language governing rights and limitations under the
14 * License.
16 * The Original Code is mozilla.org code.
18 * The Initial Developer of the Original Code is
19 * Netscape Communications Corporation.
20 * Portions created by the Initial Developer are Copyright (C) 1998
21 * the Initial Developer. All Rights Reserved.
23 * Contributor(s):
25 * Alternatively, the contents of this file may be used under the terms of
26 * either the GNU General Public License Version 2 or later (the "GPL"), or
27 * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
28 * in which case the provisions of the GPL or the LGPL are applicable instead
29 * of those above. If you wish to allow use of your version of this file only
30 * under the terms of either the GPL or the LGPL, and not to allow others to
31 * use your version of this file under the terms of the MPL, indicate your
32 * decision by deleting the provisions above and replace them with the notice
33 * and other provisions required by the GPL or the LGPL. If you do not delete
34 * the provisions above, a recipient may use your version of this file under
35 * the terms of any one of the MPL, the GPL or the LGPL.
37 * ***** END LICENSE BLOCK ***** */
39 #include "nsInputStreamPump.h"
40 #include "nsIServiceManager.h"
41 #include "nsIStreamTransportService.h"
42 #include "nsIInterfaceRequestorUtils.h"
43 #include "nsISeekableStream.h"
44 #include "nsITransport.h"
45 #include "nsNetUtil.h"
46 #include "nsThreadUtils.h"
47 #include "nsNetSegmentUtils.h"
48 #include "nsCOMPtr.h"
49 #include "prlog.h"
51 static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID);
53 #if defined(PR_LOGGING)
55 // NSPR_LOG_MODULES=nsStreamPump:5
57 static PRLogModuleInfo *gStreamPumpLog = nsnull;
58 #endif
59 #define LOG(args) PR_LOG(gStreamPumpLog, PR_LOG_DEBUG, args)
61 //-----------------------------------------------------------------------------
62 // nsInputStreamPump methods
63 //-----------------------------------------------------------------------------
65 nsInputStreamPump::nsInputStreamPump()
66 : mState(STATE_IDLE)
67 , mStreamOffset(0)
68 , mStreamLength(LL_MaxUint())
69 , mStatus(NS_OK)
70 , mSuspendCount(0)
71 , mLoadFlags(LOAD_NORMAL)
72 , mWaiting(PR_FALSE)
73 , mCloseWhenDone(PR_FALSE)
75 #if defined(PR_LOGGING)
76 if (!gStreamPumpLog)
77 gStreamPumpLog = PR_NewLogModule("nsStreamPump");
78 #endif
81 nsInputStreamPump::~nsInputStreamPump()
85 nsresult
86 nsInputStreamPump::Create(nsInputStreamPump **result,
87 nsIInputStream *stream,
88 PRInt64 streamPos,
89 PRInt64 streamLen,
90 PRUint32 segsize,
91 PRUint32 segcount,
92 PRBool closeWhenDone)
94 nsresult rv = NS_ERROR_OUT_OF_MEMORY;
95 nsRefPtr<nsInputStreamPump> pump = new nsInputStreamPump();
96 if (pump) {
97 rv = pump->Init(stream, streamPos, streamLen,
98 segsize, segcount, closeWhenDone);
99 if (NS_SUCCEEDED(rv)) {
100 *result = nsnull;
101 pump.swap(*result);
104 return rv;
107 struct PeekData {
108 PeekData(nsInputStreamPump::PeekSegmentFun fun, void* closure)
109 : mFunc(fun), mClosure(closure) {}
111 nsInputStreamPump::PeekSegmentFun mFunc;
112 void* mClosure;
115 static NS_METHOD
116 CallPeekFunc(nsIInputStream *aInStream, void *aClosure,
117 const char *aFromSegment, PRUint32 aToOffset, PRUint32 aCount,
118 PRUint32 *aWriteCount)
120 NS_ASSERTION(aToOffset == 0, "Called more than once?");
121 NS_ASSERTION(aCount > 0, "Called without data?");
123 PeekData* data = static_cast<PeekData*>(aClosure);
124 data->mFunc(data->mClosure,
125 reinterpret_cast<const PRUint8*>(aFromSegment), aCount);
126 return NS_BINDING_ABORTED;
129 nsresult
130 nsInputStreamPump::PeekStream(PeekSegmentFun callback, void* closure)
132 NS_ASSERTION(mAsyncStream, "PeekStream called without stream");
134 // See if the pipe is closed by checking the return of Available.
135 PRUint32 dummy;
136 nsresult rv = mAsyncStream->Available(&dummy);
137 if (NS_FAILED(rv))
138 return rv;
140 PeekData data(callback, closure);
141 return mAsyncStream->ReadSegments(CallPeekFunc, &data,
142 NET_DEFAULT_SEGMENT_SIZE, &dummy);
145 nsresult
146 nsInputStreamPump::EnsureWaiting()
148 // no need to worry about multiple threads... an input stream pump lives
149 // on only one thread.
151 if (!mWaiting) {
152 nsresult rv = mAsyncStream->AsyncWait(this, 0, 0, mTargetThread);
153 if (NS_FAILED(rv)) {
154 NS_ERROR("AsyncWait failed");
155 return rv;
157 mWaiting = PR_TRUE;
159 return NS_OK;
162 //-----------------------------------------------------------------------------
163 // nsInputStreamPump::nsISupports
164 //-----------------------------------------------------------------------------
166 // although this class can only be accessed from one thread at a time, we do
167 // allow its ownership to move from thread to thread, assuming the consumer
168 // understands the limitations of this.
169 NS_IMPL_THREADSAFE_ISUPPORTS3(nsInputStreamPump,
170 nsIRequest,
171 nsIInputStreamCallback,
172 nsIInputStreamPump)
174 //-----------------------------------------------------------------------------
175 // nsInputStreamPump::nsIRequest
176 //-----------------------------------------------------------------------------
178 NS_IMETHODIMP
179 nsInputStreamPump::GetName(nsACString &result)
181 result.Truncate();
182 return NS_OK;
185 NS_IMETHODIMP
186 nsInputStreamPump::IsPending(PRBool *result)
188 *result = (mState != STATE_IDLE);
189 return NS_OK;
192 NS_IMETHODIMP
193 nsInputStreamPump::GetStatus(nsresult *status)
195 *status = mStatus;
196 return NS_OK;
199 NS_IMETHODIMP
200 nsInputStreamPump::Cancel(nsresult status)
202 LOG(("nsInputStreamPump::Cancel [this=%x status=%x]\n",
203 this, status));
205 if (NS_FAILED(mStatus)) {
206 LOG((" already canceled\n"));
207 return NS_OK;
210 NS_ASSERTION(NS_FAILED(status), "cancel with non-failure status code");
211 mStatus = status;
213 // close input stream
214 if (mAsyncStream) {
215 mAsyncStream->CloseWithStatus(status);
216 if (mSuspendCount == 0)
217 EnsureWaiting();
218 // Otherwise, EnsureWaiting will be called by Resume().
219 // Note that while suspended, OnInputStreamReady will
220 // not do anything, and also note that calling asyncWait
221 // on a closed stream works and will dispatch an event immediately.
223 return NS_OK;
226 NS_IMETHODIMP
227 nsInputStreamPump::Suspend()
229 LOG(("nsInputStreamPump::Suspend [this=%x]\n", this));
230 NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED);
231 ++mSuspendCount;
232 return NS_OK;
235 NS_IMETHODIMP
236 nsInputStreamPump::Resume()
238 LOG(("nsInputStreamPump::Resume [this=%x]\n", this));
239 NS_ENSURE_TRUE(mSuspendCount > 0, NS_ERROR_UNEXPECTED);
240 NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED);
242 if (--mSuspendCount == 0)
243 EnsureWaiting();
244 return NS_OK;
247 NS_IMETHODIMP
248 nsInputStreamPump::GetLoadFlags(nsLoadFlags *aLoadFlags)
250 *aLoadFlags = mLoadFlags;
251 return NS_OK;
254 NS_IMETHODIMP
255 nsInputStreamPump::SetLoadFlags(nsLoadFlags aLoadFlags)
257 mLoadFlags = aLoadFlags;
258 return NS_OK;
261 NS_IMETHODIMP
262 nsInputStreamPump::GetLoadGroup(nsILoadGroup **aLoadGroup)
264 NS_IF_ADDREF(*aLoadGroup = mLoadGroup);
265 return NS_OK;
268 NS_IMETHODIMP
269 nsInputStreamPump::SetLoadGroup(nsILoadGroup *aLoadGroup)
271 mLoadGroup = aLoadGroup;
272 return NS_OK;
275 //-----------------------------------------------------------------------------
276 // nsInputStreamPump::nsIInputStreamPump implementation
277 //-----------------------------------------------------------------------------
279 NS_IMETHODIMP
280 nsInputStreamPump::Init(nsIInputStream *stream,
281 PRInt64 streamPos, PRInt64 streamLen,
282 PRUint32 segsize, PRUint32 segcount,
283 PRBool closeWhenDone)
285 NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS);
287 mStreamOffset = PRUint64(streamPos);
288 if (nsInt64(streamLen) >= nsInt64(0))
289 mStreamLength = PRUint64(streamLen);
290 mStream = stream;
291 mSegSize = segsize;
292 mSegCount = segcount;
293 mCloseWhenDone = closeWhenDone;
295 return NS_OK;
298 NS_IMETHODIMP
299 nsInputStreamPump::AsyncRead(nsIStreamListener *listener, nsISupports *ctxt)
301 NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS);
302 NS_ENSURE_ARG_POINTER(listener);
305 // OK, we need to use the stream transport service if
307 // (1) the stream is blocking
308 // (2) the stream does not support nsIAsyncInputStream
311 PRBool nonBlocking;
312 nsresult rv = mStream->IsNonBlocking(&nonBlocking);
313 if (NS_FAILED(rv)) return rv;
315 if (nonBlocking) {
316 mAsyncStream = do_QueryInterface(mStream);
318 // if the stream supports nsIAsyncInputStream, and if we need to seek
319 // to a starting offset, then we must do so here. in the non-async
320 // stream case, the stream transport service will take care of seeking
321 // for us.
323 if (mAsyncStream && (mStreamOffset != LL_MAXUINT)) {
324 nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mStream);
325 if (seekable)
326 seekable->Seek(nsISeekableStream::NS_SEEK_SET, mStreamOffset);
330 if (!mAsyncStream) {
331 // ok, let's use the stream transport service to read this stream.
332 nsCOMPtr<nsIStreamTransportService> sts =
333 do_GetService(kStreamTransportServiceCID, &rv);
334 if (NS_FAILED(rv)) return rv;
336 nsCOMPtr<nsITransport> transport;
337 rv = sts->CreateInputTransport(mStream, mStreamOffset, mStreamLength,
338 mCloseWhenDone, getter_AddRefs(transport));
339 if (NS_FAILED(rv)) return rv;
341 nsCOMPtr<nsIInputStream> wrapper;
342 rv = transport->OpenInputStream(0, mSegSize, mSegCount, getter_AddRefs(wrapper));
343 if (NS_FAILED(rv)) return rv;
345 mAsyncStream = do_QueryInterface(wrapper, &rv);
346 if (NS_FAILED(rv)) return rv;
349 // release our reference to the original stream. from this point forward,
350 // we only reference the "stream" via mAsyncStream.
351 mStream = 0;
353 // mStreamOffset now holds the number of bytes currently read. we use this
354 // to enforce the mStreamLength restriction.
355 mStreamOffset = 0;
357 // grab event queue (we must do this here by contract, since all notifications
358 // must go to the thread which called AsyncRead)
359 mTargetThread = do_GetCurrentThread();
360 NS_ENSURE_STATE(mTargetThread);
362 rv = EnsureWaiting();
363 if (NS_FAILED(rv)) return rv;
365 if (mLoadGroup)
366 mLoadGroup->AddRequest(this, nsnull);
368 mState = STATE_START;
369 mListener = listener;
370 mListenerContext = ctxt;
371 return NS_OK;
374 //-----------------------------------------------------------------------------
375 // nsInputStreamPump::nsIInputStreamCallback implementation
376 //-----------------------------------------------------------------------------
378 NS_IMETHODIMP
379 nsInputStreamPump::OnInputStreamReady(nsIAsyncInputStream *stream)
381 LOG(("nsInputStreamPump::OnInputStreamReady [this=%x]\n", this));
383 // this function has been called from a PLEvent, so we can safely call
384 // any listener or progress sink methods directly from here.
386 for (;;) {
387 if (mSuspendCount || mState == STATE_IDLE) {
388 mWaiting = PR_FALSE;
389 break;
392 PRUint32 nextState;
393 switch (mState) {
394 case STATE_START:
395 nextState = OnStateStart();
396 break;
397 case STATE_TRANSFER:
398 nextState = OnStateTransfer();
399 break;
400 case STATE_STOP:
401 nextState = OnStateStop();
402 break;
405 if (mState == nextState && !mSuspendCount) {
406 NS_ASSERTION(mState == STATE_TRANSFER, "unexpected state");
407 NS_ASSERTION(NS_SUCCEEDED(mStatus), "unexpected status");
409 mWaiting = PR_FALSE;
410 mStatus = EnsureWaiting();
411 if (NS_SUCCEEDED(mStatus))
412 break;
414 nextState = STATE_STOP;
417 mState = nextState;
419 return NS_OK;
422 PRUint32
423 nsInputStreamPump::OnStateStart()
425 LOG((" OnStateStart [this=%x]\n", this));
427 nsresult rv;
429 // need to check the reason why the stream is ready. this is required
430 // so our listener can check our status from OnStartRequest.
431 // XXX async streams should have a GetStatus method!
432 if (NS_SUCCEEDED(mStatus)) {
433 PRUint32 avail;
434 rv = mAsyncStream->Available(&avail);
435 if (NS_FAILED(rv) && rv != NS_BASE_STREAM_CLOSED)
436 mStatus = rv;
439 rv = mListener->OnStartRequest(this, mListenerContext);
441 // an error returned from OnStartRequest should cause us to abort; however,
442 // we must not stomp on mStatus if already canceled.
443 if (NS_FAILED(rv) && NS_SUCCEEDED(mStatus))
444 mStatus = rv;
446 return NS_SUCCEEDED(mStatus) ? STATE_TRANSFER : STATE_STOP;
449 PRUint32
450 nsInputStreamPump::OnStateTransfer()
452 LOG((" OnStateTransfer [this=%x]\n", this));
454 // if canceled, go directly to STATE_STOP...
455 if (NS_FAILED(mStatus))
456 return STATE_STOP;
458 nsresult rv;
460 PRUint32 avail;
461 rv = mAsyncStream->Available(&avail);
462 LOG((" Available returned [stream=%x rv=%x avail=%u]\n", mAsyncStream.get(), rv, avail));
464 if (rv == NS_BASE_STREAM_CLOSED) {
465 rv = NS_OK;
466 avail = 0;
468 else if (NS_SUCCEEDED(rv) && avail) {
469 // figure out how much data to report (XXX detect overflow??)
470 if (PRUint64(avail) + mStreamOffset > mStreamLength)
471 avail = PRUint32(mStreamLength - mStreamOffset);
473 if (avail) {
474 // we used to limit avail to 16K - we were afraid some ODA handlers
475 // might assume they wouldn't get more than 16K at once
476 // we're removing that limit since it speeds up local file access.
477 // Now there's an implicit 64K limit of 4 16K segments
478 // NOTE: ok, so the story is as follows. OnDataAvailable impls
479 // are by contract supposed to consume exactly |avail| bytes.
480 // however, many do not... mailnews... stream converters...
481 // cough, cough. the input stream pump is fairly tolerant
482 // in this regard; however, if an ODA does not consume any
483 // data from the stream, then we could potentially end up in
484 // an infinite loop. we do our best here to try to catch
485 // such an error. (see bug 189672)
487 // in most cases this QI will succeed (mAsyncStream is almost always
488 // a nsPipeInputStream, which implements nsISeekableStream::Tell).
489 PRInt64 offsetBefore;
490 nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mAsyncStream);
491 if (seekable && NS_FAILED(seekable->Tell(&offsetBefore))) {
492 NS_NOTREACHED("Tell failed on readable stream");
493 offsetBefore = 0;
496 // report the current stream offset to our listener... if we've
497 // streamed more than PR_UINT32_MAX, then avoid overflowing the
498 // stream offset. it's the best we can do without a 64-bit stream
499 // listener API.
500 PRUint32 odaOffset =
501 mStreamOffset > PR_UINT32_MAX ?
502 PR_UINT32_MAX : PRUint32(mStreamOffset);
504 LOG((" calling OnDataAvailable [offset=%lld(%u) count=%u]\n",
505 mStreamOffset, odaOffset, avail));
507 rv = mListener->OnDataAvailable(this, mListenerContext, mAsyncStream,
508 odaOffset, avail);
510 // don't enter this code if ODA failed or called Cancel
511 if (NS_SUCCEEDED(rv) && NS_SUCCEEDED(mStatus)) {
512 // test to see if this ODA failed to consume data
513 if (seekable) {
514 // NOTE: if Tell fails, which can happen if the stream is
515 // now closed, then we assume that everything was read.
516 PRInt64 offsetAfter;
517 if (NS_FAILED(seekable->Tell(&offsetAfter)))
518 offsetAfter = offsetBefore + avail;
519 if (offsetAfter > offsetBefore)
520 mStreamOffset += (offsetAfter - offsetBefore);
521 else if (mSuspendCount == 0) {
523 // possible infinite loop if we continue pumping data!
525 // NOTE: although not allowed by nsIStreamListener, we
526 // will allow the ODA impl to Suspend the pump. IMAP
527 // does this :-(
529 NS_ERROR("OnDataAvailable implementation consumed no data");
530 mStatus = NS_ERROR_UNEXPECTED;
533 else
534 mStreamOffset += avail; // assume ODA behaved well
539 // an error returned from Available or OnDataAvailable should cause us to
540 // abort; however, we must not stomp on mStatus if already canceled.
542 if (NS_SUCCEEDED(mStatus)) {
543 if (NS_FAILED(rv))
544 mStatus = rv;
545 else if (avail) {
546 // if stream is now closed, advance to STATE_STOP right away.
547 // Available may return 0 bytes available at the moment; that
548 // would not mean that we are done.
549 // XXX async streams should have a GetStatus method!
550 rv = mAsyncStream->Available(&avail);
551 if (NS_SUCCEEDED(rv))
552 return STATE_TRANSFER;
555 return STATE_STOP;
558 PRUint32
559 nsInputStreamPump::OnStateStop()
561 LOG((" OnStateStop [this=%x status=%x]\n", this, mStatus));
563 // if an error occured, we must be sure to pass the error onto the async
564 // stream. in some cases, this is redundant, but since close is idempotent,
565 // this is OK. otherwise, be sure to honor the "close-when-done" option.
567 if (NS_FAILED(mStatus))
568 mAsyncStream->CloseWithStatus(mStatus);
569 else if (mCloseWhenDone)
570 mAsyncStream->Close();
572 mAsyncStream = 0;
573 mTargetThread = 0;
574 mIsPending = PR_FALSE;
576 mListener->OnStopRequest(this, mListenerContext, mStatus);
577 mListener = 0;
578 mListenerContext = 0;
580 if (mLoadGroup)
581 mLoadGroup->RemoveRequest(this, nsnull, mStatus);
583 return STATE_IDLE;