1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /* vim:set ts=4 sts=4 sw=4 et cin: */
3 /* ***** BEGIN LICENSE BLOCK *****
4 * Version: MPL 1.1/GPL 2.0/LGPL 2.1
6 * The contents of this file are subject to the Mozilla Public License Version
7 * 1.1 (the "License"); you may not use this file except in compliance with
8 * the License. You may obtain a copy of the License at
9 * http://www.mozilla.org/MPL/
11 * Software distributed under the License is distributed on an "AS IS" basis,
12 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
13 * for the specific language governing rights and limitations under the
16 * The Original Code is mozilla.org code.
18 * The Initial Developer of the Original Code is
19 * Netscape Communications Corporation.
20 * Portions created by the Initial Developer are Copyright (C) 1998
21 * the Initial Developer. All Rights Reserved.
25 * Alternatively, the contents of this file may be used under the terms of
26 * either the GNU General Public License Version 2 or later (the "GPL"), or
27 * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
28 * in which case the provisions of the GPL or the LGPL are applicable instead
29 * of those above. If you wish to allow use of your version of this file only
30 * under the terms of either the GPL or the LGPL, and not to allow others to
31 * use your version of this file under the terms of the MPL, indicate your
32 * decision by deleting the provisions above and replace them with the notice
33 * and other provisions required by the GPL or the LGPL. If you do not delete
34 * the provisions above, a recipient may use your version of this file under
35 * the terms of any one of the MPL, the GPL or the LGPL.
37 * ***** END LICENSE BLOCK ***** */
39 #include "nsInputStreamPump.h"
40 #include "nsIServiceManager.h"
41 #include "nsIStreamTransportService.h"
42 #include "nsIInterfaceRequestorUtils.h"
43 #include "nsISeekableStream.h"
44 #include "nsITransport.h"
45 #include "nsNetUtil.h"
46 #include "nsThreadUtils.h"
47 #include "nsNetSegmentUtils.h"
51 static NS_DEFINE_CID(kStreamTransportServiceCID
, NS_STREAMTRANSPORTSERVICE_CID
);
53 #if defined(PR_LOGGING)
55 // NSPR_LOG_MODULES=nsStreamPump:5
57 static PRLogModuleInfo
*gStreamPumpLog
= nsnull
;
59 #define LOG(args) PR_LOG(gStreamPumpLog, PR_LOG_DEBUG, args)
61 //-----------------------------------------------------------------------------
62 // nsInputStreamPump methods
63 //-----------------------------------------------------------------------------
65 nsInputStreamPump::nsInputStreamPump()
68 , mStreamLength(LL_MaxUint())
71 , mLoadFlags(LOAD_NORMAL
)
73 , mCloseWhenDone(PR_FALSE
)
75 #if defined(PR_LOGGING)
77 gStreamPumpLog
= PR_NewLogModule("nsStreamPump");
81 nsInputStreamPump::~nsInputStreamPump()
86 nsInputStreamPump::Create(nsInputStreamPump
**result
,
87 nsIInputStream
*stream
,
94 nsresult rv
= NS_ERROR_OUT_OF_MEMORY
;
95 nsRefPtr
<nsInputStreamPump
> pump
= new nsInputStreamPump();
97 rv
= pump
->Init(stream
, streamPos
, streamLen
,
98 segsize
, segcount
, closeWhenDone
);
99 if (NS_SUCCEEDED(rv
)) {
108 PeekData(nsInputStreamPump::PeekSegmentFun fun
, void* closure
)
109 : mFunc(fun
), mClosure(closure
) {}
111 nsInputStreamPump::PeekSegmentFun mFunc
;
116 CallPeekFunc(nsIInputStream
*aInStream
, void *aClosure
,
117 const char *aFromSegment
, PRUint32 aToOffset
, PRUint32 aCount
,
118 PRUint32
*aWriteCount
)
120 NS_ASSERTION(aToOffset
== 0, "Called more than once?");
121 NS_ASSERTION(aCount
> 0, "Called without data?");
123 PeekData
* data
= static_cast<PeekData
*>(aClosure
);
124 data
->mFunc(data
->mClosure
,
125 reinterpret_cast<const PRUint8
*>(aFromSegment
), aCount
);
126 return NS_BINDING_ABORTED
;
130 nsInputStreamPump::PeekStream(PeekSegmentFun callback
, void* closure
)
132 NS_ASSERTION(mAsyncStream
, "PeekStream called without stream");
134 // See if the pipe is closed by checking the return of Available.
136 nsresult rv
= mAsyncStream
->Available(&dummy
);
140 PeekData
data(callback
, closure
);
141 return mAsyncStream
->ReadSegments(CallPeekFunc
, &data
,
142 NET_DEFAULT_SEGMENT_SIZE
, &dummy
);
146 nsInputStreamPump::EnsureWaiting()
148 // no need to worry about multiple threads... an input stream pump lives
149 // on only one thread.
152 nsresult rv
= mAsyncStream
->AsyncWait(this, 0, 0, mTargetThread
);
154 NS_ERROR("AsyncWait failed");
162 //-----------------------------------------------------------------------------
163 // nsInputStreamPump::nsISupports
164 //-----------------------------------------------------------------------------
166 // although this class can only be accessed from one thread at a time, we do
167 // allow its ownership to move from thread to thread, assuming the consumer
168 // understands the limitations of this.
169 NS_IMPL_THREADSAFE_ISUPPORTS3(nsInputStreamPump
,
171 nsIInputStreamCallback
,
174 //-----------------------------------------------------------------------------
175 // nsInputStreamPump::nsIRequest
176 //-----------------------------------------------------------------------------
179 nsInputStreamPump::GetName(nsACString
&result
)
186 nsInputStreamPump::IsPending(PRBool
*result
)
188 *result
= (mState
!= STATE_IDLE
);
193 nsInputStreamPump::GetStatus(nsresult
*status
)
200 nsInputStreamPump::Cancel(nsresult status
)
202 LOG(("nsInputStreamPump::Cancel [this=%x status=%x]\n",
205 if (NS_FAILED(mStatus
)) {
206 LOG((" already canceled\n"));
210 NS_ASSERTION(NS_FAILED(status
), "cancel with non-failure status code");
213 // close input stream
215 mAsyncStream
->CloseWithStatus(status
);
216 if (mSuspendCount
== 0)
218 // Otherwise, EnsureWaiting will be called by Resume().
219 // Note that while suspended, OnInputStreamReady will
220 // not do anything, and also note that calling asyncWait
221 // on a closed stream works and will dispatch an event immediately.
227 nsInputStreamPump::Suspend()
229 LOG(("nsInputStreamPump::Suspend [this=%x]\n", this));
230 NS_ENSURE_TRUE(mState
!= STATE_IDLE
, NS_ERROR_UNEXPECTED
);
236 nsInputStreamPump::Resume()
238 LOG(("nsInputStreamPump::Resume [this=%x]\n", this));
239 NS_ENSURE_TRUE(mSuspendCount
> 0, NS_ERROR_UNEXPECTED
);
240 NS_ENSURE_TRUE(mState
!= STATE_IDLE
, NS_ERROR_UNEXPECTED
);
242 if (--mSuspendCount
== 0)
248 nsInputStreamPump::GetLoadFlags(nsLoadFlags
*aLoadFlags
)
250 *aLoadFlags
= mLoadFlags
;
255 nsInputStreamPump::SetLoadFlags(nsLoadFlags aLoadFlags
)
257 mLoadFlags
= aLoadFlags
;
262 nsInputStreamPump::GetLoadGroup(nsILoadGroup
**aLoadGroup
)
264 NS_IF_ADDREF(*aLoadGroup
= mLoadGroup
);
269 nsInputStreamPump::SetLoadGroup(nsILoadGroup
*aLoadGroup
)
271 mLoadGroup
= aLoadGroup
;
275 //-----------------------------------------------------------------------------
276 // nsInputStreamPump::nsIInputStreamPump implementation
277 //-----------------------------------------------------------------------------
280 nsInputStreamPump::Init(nsIInputStream
*stream
,
281 PRInt64 streamPos
, PRInt64 streamLen
,
282 PRUint32 segsize
, PRUint32 segcount
,
283 PRBool closeWhenDone
)
285 NS_ENSURE_TRUE(mState
== STATE_IDLE
, NS_ERROR_IN_PROGRESS
);
287 mStreamOffset
= PRUint64(streamPos
);
288 if (nsInt64(streamLen
) >= nsInt64(0))
289 mStreamLength
= PRUint64(streamLen
);
292 mSegCount
= segcount
;
293 mCloseWhenDone
= closeWhenDone
;
299 nsInputStreamPump::AsyncRead(nsIStreamListener
*listener
, nsISupports
*ctxt
)
301 NS_ENSURE_TRUE(mState
== STATE_IDLE
, NS_ERROR_IN_PROGRESS
);
302 NS_ENSURE_ARG_POINTER(listener
);
305 // OK, we need to use the stream transport service if
307 // (1) the stream is blocking
308 // (2) the stream does not support nsIAsyncInputStream
312 nsresult rv
= mStream
->IsNonBlocking(&nonBlocking
);
313 if (NS_FAILED(rv
)) return rv
;
316 mAsyncStream
= do_QueryInterface(mStream
);
318 // if the stream supports nsIAsyncInputStream, and if we need to seek
319 // to a starting offset, then we must do so here. in the non-async
320 // stream case, the stream transport service will take care of seeking
323 if (mAsyncStream
&& (mStreamOffset
!= LL_MAXUINT
)) {
324 nsCOMPtr
<nsISeekableStream
> seekable
= do_QueryInterface(mStream
);
326 seekable
->Seek(nsISeekableStream::NS_SEEK_SET
, mStreamOffset
);
331 // ok, let's use the stream transport service to read this stream.
332 nsCOMPtr
<nsIStreamTransportService
> sts
=
333 do_GetService(kStreamTransportServiceCID
, &rv
);
334 if (NS_FAILED(rv
)) return rv
;
336 nsCOMPtr
<nsITransport
> transport
;
337 rv
= sts
->CreateInputTransport(mStream
, mStreamOffset
, mStreamLength
,
338 mCloseWhenDone
, getter_AddRefs(transport
));
339 if (NS_FAILED(rv
)) return rv
;
341 nsCOMPtr
<nsIInputStream
> wrapper
;
342 rv
= transport
->OpenInputStream(0, mSegSize
, mSegCount
, getter_AddRefs(wrapper
));
343 if (NS_FAILED(rv
)) return rv
;
345 mAsyncStream
= do_QueryInterface(wrapper
, &rv
);
346 if (NS_FAILED(rv
)) return rv
;
349 // release our reference to the original stream. from this point forward,
350 // we only reference the "stream" via mAsyncStream.
353 // mStreamOffset now holds the number of bytes currently read. we use this
354 // to enforce the mStreamLength restriction.
357 // grab event queue (we must do this here by contract, since all notifications
358 // must go to the thread which called AsyncRead)
359 mTargetThread
= do_GetCurrentThread();
360 NS_ENSURE_STATE(mTargetThread
);
362 rv
= EnsureWaiting();
363 if (NS_FAILED(rv
)) return rv
;
366 mLoadGroup
->AddRequest(this, nsnull
);
368 mState
= STATE_START
;
369 mListener
= listener
;
370 mListenerContext
= ctxt
;
374 //-----------------------------------------------------------------------------
375 // nsInputStreamPump::nsIInputStreamCallback implementation
376 //-----------------------------------------------------------------------------
379 nsInputStreamPump::OnInputStreamReady(nsIAsyncInputStream
*stream
)
381 LOG(("nsInputStreamPump::OnInputStreamReady [this=%x]\n", this));
383 // this function has been called from a PLEvent, so we can safely call
384 // any listener or progress sink methods directly from here.
387 if (mSuspendCount
|| mState
== STATE_IDLE
) {
395 nextState
= OnStateStart();
398 nextState
= OnStateTransfer();
401 nextState
= OnStateStop();
405 if (mState
== nextState
&& !mSuspendCount
) {
406 NS_ASSERTION(mState
== STATE_TRANSFER
, "unexpected state");
407 NS_ASSERTION(NS_SUCCEEDED(mStatus
), "unexpected status");
410 mStatus
= EnsureWaiting();
411 if (NS_SUCCEEDED(mStatus
))
414 nextState
= STATE_STOP
;
423 nsInputStreamPump::OnStateStart()
425 LOG((" OnStateStart [this=%x]\n", this));
429 // need to check the reason why the stream is ready. this is required
430 // so our listener can check our status from OnStartRequest.
431 // XXX async streams should have a GetStatus method!
432 if (NS_SUCCEEDED(mStatus
)) {
434 rv
= mAsyncStream
->Available(&avail
);
435 if (NS_FAILED(rv
) && rv
!= NS_BASE_STREAM_CLOSED
)
439 rv
= mListener
->OnStartRequest(this, mListenerContext
);
441 // an error returned from OnStartRequest should cause us to abort; however,
442 // we must not stomp on mStatus if already canceled.
443 if (NS_FAILED(rv
) && NS_SUCCEEDED(mStatus
))
446 return NS_SUCCEEDED(mStatus
) ? STATE_TRANSFER
: STATE_STOP
;
450 nsInputStreamPump::OnStateTransfer()
452 LOG((" OnStateTransfer [this=%x]\n", this));
454 // if canceled, go directly to STATE_STOP...
455 if (NS_FAILED(mStatus
))
461 rv
= mAsyncStream
->Available(&avail
);
462 LOG((" Available returned [stream=%x rv=%x avail=%u]\n", mAsyncStream
.get(), rv
, avail
));
464 if (rv
== NS_BASE_STREAM_CLOSED
) {
468 else if (NS_SUCCEEDED(rv
) && avail
) {
469 // figure out how much data to report (XXX detect overflow??)
470 if (PRUint64(avail
) + mStreamOffset
> mStreamLength
)
471 avail
= PRUint32(mStreamLength
- mStreamOffset
);
474 // we used to limit avail to 16K - we were afraid some ODA handlers
475 // might assume they wouldn't get more than 16K at once
476 // we're removing that limit since it speeds up local file access.
477 // Now there's an implicit 64K limit of 4 16K segments
478 // NOTE: ok, so the story is as follows. OnDataAvailable impls
479 // are by contract supposed to consume exactly |avail| bytes.
480 // however, many do not... mailnews... stream converters...
481 // cough, cough. the input stream pump is fairly tolerant
482 // in this regard; however, if an ODA does not consume any
483 // data from the stream, then we could potentially end up in
484 // an infinite loop. we do our best here to try to catch
485 // such an error. (see bug 189672)
487 // in most cases this QI will succeed (mAsyncStream is almost always
488 // a nsPipeInputStream, which implements nsISeekableStream::Tell).
489 PRInt64 offsetBefore
;
490 nsCOMPtr
<nsISeekableStream
> seekable
= do_QueryInterface(mAsyncStream
);
491 if (seekable
&& NS_FAILED(seekable
->Tell(&offsetBefore
))) {
492 NS_NOTREACHED("Tell failed on readable stream");
496 // report the current stream offset to our listener... if we've
497 // streamed more than PR_UINT32_MAX, then avoid overflowing the
498 // stream offset. it's the best we can do without a 64-bit stream
501 mStreamOffset
> PR_UINT32_MAX
?
502 PR_UINT32_MAX
: PRUint32(mStreamOffset
);
504 LOG((" calling OnDataAvailable [offset=%lld(%u) count=%u]\n",
505 mStreamOffset
, odaOffset
, avail
));
507 rv
= mListener
->OnDataAvailable(this, mListenerContext
, mAsyncStream
,
510 // don't enter this code if ODA failed or called Cancel
511 if (NS_SUCCEEDED(rv
) && NS_SUCCEEDED(mStatus
)) {
512 // test to see if this ODA failed to consume data
514 // NOTE: if Tell fails, which can happen if the stream is
515 // now closed, then we assume that everything was read.
517 if (NS_FAILED(seekable
->Tell(&offsetAfter
)))
518 offsetAfter
= offsetBefore
+ avail
;
519 if (offsetAfter
> offsetBefore
)
520 mStreamOffset
+= (offsetAfter
- offsetBefore
);
521 else if (mSuspendCount
== 0) {
523 // possible infinite loop if we continue pumping data!
525 // NOTE: although not allowed by nsIStreamListener, we
526 // will allow the ODA impl to Suspend the pump. IMAP
529 NS_ERROR("OnDataAvailable implementation consumed no data");
530 mStatus
= NS_ERROR_UNEXPECTED
;
534 mStreamOffset
+= avail
; // assume ODA behaved well
539 // an error returned from Available or OnDataAvailable should cause us to
540 // abort; however, we must not stomp on mStatus if already canceled.
542 if (NS_SUCCEEDED(mStatus
)) {
546 // if stream is now closed, advance to STATE_STOP right away.
547 // Available may return 0 bytes available at the moment; that
548 // would not mean that we are done.
549 // XXX async streams should have a GetStatus method!
550 rv
= mAsyncStream
->Available(&avail
);
551 if (NS_SUCCEEDED(rv
))
552 return STATE_TRANSFER
;
559 nsInputStreamPump::OnStateStop()
561 LOG((" OnStateStop [this=%x status=%x]\n", this, mStatus
));
563 // if an error occured, we must be sure to pass the error onto the async
564 // stream. in some cases, this is redundant, but since close is idempotent,
565 // this is OK. otherwise, be sure to honor the "close-when-done" option.
567 if (NS_FAILED(mStatus
))
568 mAsyncStream
->CloseWithStatus(mStatus
);
569 else if (mCloseWhenDone
)
570 mAsyncStream
->Close();
574 mIsPending
= PR_FALSE
;
576 mListener
->OnStopRequest(this, mListenerContext
, mStatus
);
578 mListenerContext
= 0;
581 mLoadGroup
->RemoveRequest(this, nsnull
, mStatus
);