1 /* vim:set ts=4 sw=4 sts=4 et cin: */
2 /* ***** BEGIN LICENSE BLOCK *****
3 * Version: MPL 1.1/GPL 2.0/LGPL 2.1
5 * The contents of this file are subject to the Mozilla Public License Version
6 * 1.1 (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 * http://www.mozilla.org/MPL/
10 * Software distributed under the License is distributed on an "AS IS" basis,
11 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
12 * for the specific language governing rights and limitations under the
15 * The Original Code is Mozilla.
17 * The Initial Developer of the Original Code is
18 * Netscape Communications Corporation.
19 * Portions created by the Initial Developer are Copyright (C) 2002
20 * the Initial Developer. All Rights Reserved.
23 * Darin Fisher <darin@netscape.com>
25 * Alternatively, the contents of this file may be used under the terms of
26 * either the GNU General Public License Version 2 or later (the "GPL"), or
27 * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
28 * in which case the provisions of the GPL or the LGPL are applicable instead
29 * of those above. If you wish to allow use of your version of this file only
30 * under the terms of either the GPL or the LGPL, and not to allow others to
31 * use your version of this file under the terms of the MPL, indicate your
32 * decision by deleting the provisions above and replace them with the notice
33 * and other provisions required by the GPL or the LGPL. If you do not delete
34 * the provisions above, a recipient may use your version of this file under
35 * the terms of any one of the MPL, the GPL or the LGPL.
37 * ***** END LICENSE BLOCK ***** */
39 #include "nsStreamUtils.h"
42 #include "nsIEventTarget.h"
43 #include "nsIRunnable.h"
44 #include "nsAutoLock.h"
47 //-----------------------------------------------------------------------------
49 class nsInputStreamReadyEvent
: public nsIRunnable
50 , public nsIInputStreamCallback
55 nsInputStreamReadyEvent(nsIInputStreamCallback
*callback
,
56 nsIEventTarget
*target
)
63 ~nsInputStreamReadyEvent()
68 // whoa!! looks like we never posted this event. take care to
69 // release mCallback on the correct thread. if mTarget lives on the
70 // calling thread, then we are ok. otherwise, we have to try to
71 // proxy the Release over the right thread. if that thread is dead,
72 // then there's nothing we can do... better to leak than crash.
75 nsresult rv
= mTarget
->IsOnCurrentThread(&val
);
76 if (NS_FAILED(rv
) || !val
) {
77 nsCOMPtr
<nsIInputStreamCallback
> event
;
78 NS_NewInputStreamReadyEvent(getter_AddRefs(event
), mCallback
,
82 rv
= event
->OnInputStreamReady(nsnull
);
84 NS_NOTREACHED("leaking stream event");
85 nsISupports
*sup
= event
;
93 NS_IMETHOD
OnInputStreamReady(nsIAsyncInputStream
*stream
)
98 mTarget
->Dispatch(this, NS_DISPATCH_NORMAL
);
100 NS_WARNING("Dispatch failed");
101 return NS_ERROR_FAILURE
;
111 mCallback
->OnInputStreamReady(mStream
);
118 nsCOMPtr
<nsIAsyncInputStream
> mStream
;
119 nsCOMPtr
<nsIInputStreamCallback
> mCallback
;
120 nsCOMPtr
<nsIEventTarget
> mTarget
;
123 NS_IMPL_THREADSAFE_ISUPPORTS2(nsInputStreamReadyEvent
, nsIRunnable
,
124 nsIInputStreamCallback
)
126 //-----------------------------------------------------------------------------
128 class nsOutputStreamReadyEvent
: public nsIRunnable
129 , public nsIOutputStreamCallback
134 nsOutputStreamReadyEvent(nsIOutputStreamCallback
*callback
,
135 nsIEventTarget
*target
)
136 : mCallback(callback
)
142 ~nsOutputStreamReadyEvent()
147 // whoa!! looks like we never posted this event. take care to
148 // release mCallback on the correct thread. if mTarget lives on the
149 // calling thread, then we are ok. otherwise, we have to try to
150 // proxy the Release over the right thread. if that thread is dead,
151 // then there's nothing we can do... better to leak than crash.
154 nsresult rv
= mTarget
->IsOnCurrentThread(&val
);
155 if (NS_FAILED(rv
) || !val
) {
156 nsCOMPtr
<nsIOutputStreamCallback
> event
;
157 NS_NewOutputStreamReadyEvent(getter_AddRefs(event
), mCallback
,
161 rv
= event
->OnOutputStreamReady(nsnull
);
163 NS_NOTREACHED("leaking stream event");
164 nsISupports
*sup
= event
;
172 NS_IMETHOD
OnOutputStreamReady(nsIAsyncOutputStream
*stream
)
177 mTarget
->Dispatch(this, NS_DISPATCH_NORMAL
);
179 NS_WARNING("PostEvent failed");
180 return NS_ERROR_FAILURE
;
190 mCallback
->OnOutputStreamReady(mStream
);
197 nsCOMPtr
<nsIAsyncOutputStream
> mStream
;
198 nsCOMPtr
<nsIOutputStreamCallback
> mCallback
;
199 nsCOMPtr
<nsIEventTarget
> mTarget
;
202 NS_IMPL_THREADSAFE_ISUPPORTS2(nsOutputStreamReadyEvent
, nsIRunnable
,
203 nsIOutputStreamCallback
)
205 //-----------------------------------------------------------------------------
208 NS_NewInputStreamReadyEvent(nsIInputStreamCallback
**event
,
209 nsIInputStreamCallback
*callback
,
210 nsIEventTarget
*target
)
212 NS_ASSERTION(callback
, "null callback");
213 NS_ASSERTION(target
, "null target");
214 nsInputStreamReadyEvent
*ev
= new nsInputStreamReadyEvent(callback
, target
);
216 return NS_ERROR_OUT_OF_MEMORY
;
217 NS_ADDREF(*event
= ev
);
222 NS_NewOutputStreamReadyEvent(nsIOutputStreamCallback
**event
,
223 nsIOutputStreamCallback
*callback
,
224 nsIEventTarget
*target
)
226 NS_ASSERTION(callback
, "null callback");
227 NS_ASSERTION(target
, "null target");
228 nsOutputStreamReadyEvent
*ev
= new nsOutputStreamReadyEvent(callback
, target
);
230 return NS_ERROR_OUT_OF_MEMORY
;
231 NS_ADDREF(*event
= ev
);
235 //-----------------------------------------------------------------------------
236 // NS_AsyncCopy implementation
238 // abstract stream copier...
239 class nsAStreamCopier
: public nsIInputStreamCallback
240 , public nsIOutputStreamCallback
251 , mEventInProcess(PR_FALSE
)
252 , mEventIsPending(PR_FALSE
)
256 // virtual since subclasses call superclass Release()
257 virtual ~nsAStreamCopier()
260 PR_DestroyLock(mLock
);
263 // kick off the async copy...
264 nsresult
Start(nsIInputStream
*source
,
265 nsIOutputStream
*sink
,
266 nsIEventTarget
*target
,
267 nsAsyncCopyCallbackFun callback
,
274 mCallback
= callback
;
276 mChunkSize
= chunksize
;
278 mLock
= PR_NewLock();
280 return NS_ERROR_OUT_OF_MEMORY
;
282 mAsyncSource
= do_QueryInterface(mSource
);
283 mAsyncSink
= do_QueryInterface(mSink
);
285 return PostContinuationEvent();
288 // implemented by subclasses, returns number of bytes copied and
289 // sets source and sink condition before returning.
290 virtual PRUint32
DoCopy(nsresult
*sourceCondition
, nsresult
*sinkCondition
) = 0;
294 if (!mSource
|| !mSink
)
297 nsresult sourceCondition
, sinkCondition
;
299 // ok, copy data from source to sink.
301 PRUint32 n
= DoCopy(&sourceCondition
, &sinkCondition
);
302 if (NS_FAILED(sourceCondition
) || NS_FAILED(sinkCondition
) || n
== 0) {
303 if (sourceCondition
== NS_BASE_STREAM_WOULD_BLOCK
&& mAsyncSource
) {
304 // need to wait for more data from source. while waiting for
305 // more source data, be sure to observe failures on output end.
306 mAsyncSource
->AsyncWait(this, 0, 0, nsnull
);
309 mAsyncSink
->AsyncWait(this,
310 nsIAsyncOutputStream::WAIT_CLOSURE_ONLY
,
313 else if (sinkCondition
== NS_BASE_STREAM_WOULD_BLOCK
&& mAsyncSink
) {
314 // need to wait for more room in the sink. while waiting for
315 // more room in the sink, be sure to observer failures on the
317 mAsyncSink
->AsyncWait(this, 0, 0, nsnull
);
320 mAsyncSource
->AsyncWait(this,
321 nsIAsyncInputStream::WAIT_CLOSURE_ONLY
,
327 mAsyncSource
->CloseWithStatus(sinkCondition
);
330 mAsyncSource
= nsnull
;
335 mAsyncSink
->CloseWithStatus(sourceCondition
);
341 // notify state complete...
343 nsresult status
= sourceCondition
;
344 if (NS_SUCCEEDED(status
))
345 status
= sinkCondition
;
346 if (status
== NS_BASE_STREAM_CLOSED
)
348 mCallback(mClosure
, status
);
356 NS_IMETHOD
OnInputStreamReady(nsIAsyncInputStream
*source
)
358 PostContinuationEvent();
362 NS_IMETHOD
OnOutputStreamReady(nsIAsyncOutputStream
*sink
)
364 PostContinuationEvent();
368 // continuation event handler
373 // clear "in process" flag and post any pending continuation event
374 nsAutoLock
lock(mLock
);
375 mEventInProcess
= PR_FALSE
;
376 if (mEventIsPending
) {
377 mEventIsPending
= PR_FALSE
;
378 PostContinuationEvent_Locked();
384 nsresult
PostContinuationEvent()
386 // we cannot post a continuation event if there is currently
387 // an event in process. doing so could result in Process being
388 // run simultaneously on multiple threads, so we mark the event
389 // as pending, and if an event is already in process then we
390 // just let that existing event take care of posting the real
391 // continuation event.
393 nsAutoLock
lock(mLock
);
394 return PostContinuationEvent_Locked();
397 nsresult
PostContinuationEvent_Locked()
401 mEventIsPending
= PR_TRUE
;
403 rv
= mTarget
->Dispatch(this, NS_DISPATCH_NORMAL
);
404 if (NS_SUCCEEDED(rv
))
405 mEventInProcess
= PR_TRUE
;
407 NS_WARNING("unable to post continuation event");
413 nsCOMPtr
<nsIInputStream
> mSource
;
414 nsCOMPtr
<nsIOutputStream
> mSink
;
415 nsCOMPtr
<nsIAsyncInputStream
> mAsyncSource
;
416 nsCOMPtr
<nsIAsyncOutputStream
> mAsyncSink
;
417 nsCOMPtr
<nsIEventTarget
> mTarget
;
419 nsAsyncCopyCallbackFun mCallback
;
422 PRPackedBool mEventInProcess
;
423 PRPackedBool mEventIsPending
;
426 NS_IMPL_THREADSAFE_ISUPPORTS3(nsAStreamCopier
,
427 nsIInputStreamCallback
,
428 nsIOutputStreamCallback
,
431 class nsStreamCopierIB
: public nsAStreamCopier
434 nsStreamCopierIB() : nsAStreamCopier() {}
435 virtual ~nsStreamCopierIB() {}
437 struct ReadSegmentsState
{
438 nsIOutputStream
*mSink
;
439 nsresult mSinkCondition
;
442 static NS_METHOD
ConsumeInputBuffer(nsIInputStream
*inStr
,
447 PRUint32
*countWritten
)
449 ReadSegmentsState
*state
= (ReadSegmentsState
*) closure
;
451 nsresult rv
= state
->mSink
->Write(buffer
, count
, countWritten
);
453 state
->mSinkCondition
= rv
;
454 else if (*countWritten
== 0)
455 state
->mSinkCondition
= NS_BASE_STREAM_CLOSED
;
457 return state
->mSinkCondition
;
460 PRUint32
DoCopy(nsresult
*sourceCondition
, nsresult
*sinkCondition
)
462 ReadSegmentsState state
;
464 state
.mSinkCondition
= NS_OK
;
468 mSource
->ReadSegments(ConsumeInputBuffer
, &state
, mChunkSize
, &n
);
469 *sinkCondition
= state
.mSinkCondition
;
474 class nsStreamCopierOB
: public nsAStreamCopier
477 nsStreamCopierOB() : nsAStreamCopier() {}
478 virtual ~nsStreamCopierOB() {}
480 struct WriteSegmentsState
{
481 nsIInputStream
*mSource
;
482 nsresult mSourceCondition
;
485 static NS_METHOD
FillOutputBuffer(nsIOutputStream
*outStr
,
492 WriteSegmentsState
*state
= (WriteSegmentsState
*) closure
;
494 nsresult rv
= state
->mSource
->Read(buffer
, count
, countRead
);
496 state
->mSourceCondition
= rv
;
497 else if (*countRead
== 0)
498 state
->mSourceCondition
= NS_BASE_STREAM_CLOSED
;
500 return state
->mSourceCondition
;
503 PRUint32
DoCopy(nsresult
*sourceCondition
, nsresult
*sinkCondition
)
505 WriteSegmentsState state
;
506 state
.mSource
= mSource
;
507 state
.mSourceCondition
= NS_OK
;
511 mSink
->WriteSegments(FillOutputBuffer
, &state
, mChunkSize
, &n
);
512 *sourceCondition
= state
.mSourceCondition
;
517 //-----------------------------------------------------------------------------
520 NS_AsyncCopy(nsIInputStream
*source
,
521 nsIOutputStream
*sink
,
522 nsIEventTarget
*target
,
523 nsAsyncCopyMode mode
,
525 nsAsyncCopyCallbackFun callback
,
528 NS_ASSERTION(target
, "non-null target required");
531 nsAStreamCopier
*copier
;
533 if (mode
== NS_ASYNCCOPY_VIA_READSEGMENTS
)
534 copier
= new nsStreamCopierIB();
536 copier
= new nsStreamCopierOB();
539 return NS_ERROR_OUT_OF_MEMORY
;
541 // Start() takes an owning ref to the copier...
543 rv
= copier
->Start(source
, sink
, target
, callback
, closure
, chunkSize
);
549 //-----------------------------------------------------------------------------
552 NS_ConsumeStream(nsIInputStream
*stream
, PRUint32 maxCount
, nsACString
&result
)
559 rv
= stream
->Available(&avail
);
561 if (rv
== NS_BASE_STREAM_CLOSED
)
567 if (avail
> maxCount
)
570 // resize result buffer
571 PRUint32 length
= result
.Length();
572 result
.SetLength(length
+ avail
);
573 if (result
.Length() != (length
+ avail
))
574 return NS_ERROR_OUT_OF_MEMORY
;
575 char *buf
= result
.BeginWriting() + length
;
578 rv
= stream
->Read(buf
, avail
, &n
);
582 result
.SetLength(length
+ n
);
591 //-----------------------------------------------------------------------------
594 TestInputStream(nsIInputStream
*inStr
,
599 PRUint32
*countWritten
)
601 PRBool
*result
= static_cast<PRBool
*>(closure
);
603 return NS_ERROR_ABORT
; // don't call me anymore
607 NS_InputStreamIsBuffered(nsIInputStream
*stream
)
609 PRBool result
= PR_FALSE
;
611 nsresult rv
= stream
->ReadSegments(TestInputStream
,
613 return result
|| NS_SUCCEEDED(rv
);
617 TestOutputStream(nsIOutputStream
*outStr
,
624 PRBool
*result
= static_cast<PRBool
*>(closure
);
626 return NS_ERROR_ABORT
; // don't call me anymore
630 NS_OutputStreamIsBuffered(nsIOutputStream
*stream
)
632 PRBool result
= PR_FALSE
;
634 stream
->WriteSegments(TestOutputStream
, &result
, 1, &n
);
638 //-----------------------------------------------------------------------------
641 NS_CopySegmentToStream(nsIInputStream
*inStr
,
646 PRUint32
*countWritten
)
648 nsIOutputStream
*outStr
= static_cast<nsIOutputStream
*>(closure
);
652 nsresult rv
= outStr
->Write(buffer
, count
, &n
);
663 NS_CopySegmentToBuffer(nsIInputStream
*inStr
,
668 PRUint32
*countWritten
)
670 char *toBuf
= static_cast<char *>(closure
);
671 memcpy(&toBuf
[offset
], buffer
, count
);
672 *countWritten
= count
;
677 NS_DiscardSegment(nsIInputStream
*inStr
,
682 PRUint32
*countWritten
)
684 *countWritten
= count
;
688 //-----------------------------------------------------------------------------
691 NS_WriteSegmentThunk(nsIInputStream
*inStr
,
696 PRUint32
*countWritten
)
698 nsWriteSegmentThunk
*thunk
= static_cast<nsWriteSegmentThunk
*>(closure
);
699 return thunk
->mFun(thunk
->mStream
, thunk
->mClosure
, buffer
, offset
, count
,