1 /* ***** BEGIN LICENSE BLOCK *****
2 * Version: MPL 1.1/GPL 2.0/LGPL 2.1
4 * The contents of this file are subject to the Mozilla Public License Version
5 * 1.1 (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
7 * http://www.mozilla.org/MPL/
9 * Software distributed under the License is distributed on an "AS IS" basis,
10 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
11 * for the specific language governing rights and limitations under the
14 * The Original Code is Mozilla.
16 * The Initial Developer of the Original Code is
17 * Netscape Communications Corporation.
18 * Portions created by the Initial Developer are Copyright (C) 2002
19 * the Initial Developer. All Rights Reserved.
22 * Darin Fisher <darin@netscape.com>
24 * Alternatively, the contents of this file may be used under the terms of
25 * either the GNU General Public License Version 2 or later (the "GPL"), or
26 * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
27 * in which case the provisions of the GPL or the LGPL are applicable instead
28 * of those above. If you wish to allow use of your version of this file only
29 * under the terms of either the GPL or the LGPL, and not to allow others to
30 * use your version of this file under the terms of the MPL, indicate your
31 * decision by deleting the provisions above and replace them with the notice
32 * and other provisions required by the GPL or the LGPL. If you do not delete
33 * the provisions above, a recipient may use your version of this file under
34 * the terms of any one of the MPL, the GPL or the LGPL.
36 * ***** END LICENSE BLOCK ***** */
38 #include "nsStreamTransportService.h"
39 #include "nsXPCOMCIDInternal.h"
40 #include "nsNetSegmentUtils.h"
41 #include "nsAutoLock.h"
43 #include "nsTransportUtils.h"
44 #include "nsStreamUtils.h"
45 #include "nsNetError.h"
48 #include "nsIServiceManager.h"
49 #include "nsIAsyncInputStream.h"
50 #include "nsIAsyncOutputStream.h"
51 #include "nsISeekableStream.h"
53 #include "nsITransport.h"
54 #include "nsIRunnable.h"
55 #include "nsIObserverService.h"
57 //-----------------------------------------------------------------------------
58 // nsInputStreamTransport
60 // Implements nsIInputStream as a wrapper around the real input stream. This
61 // allows the transport to support seeking, range-limiting, progress reporting,
62 // and close-when-done semantics while utilizing NS_AsyncCopy.
63 //-----------------------------------------------------------------------------
65 class nsInputStreamTransport
: public nsITransport
66 , public nsIInputStream
71 NS_DECL_NSIINPUTSTREAM
73 nsInputStreamTransport(nsIInputStream
*source
,
80 , mCloseWhenDone(closeWhenDone
)
82 , mInProgress(PR_FALSE
)
86 virtual ~nsInputStreamTransport()
91 nsCOMPtr
<nsIAsyncInputStream
> mPipeIn
;
93 // while the copy is active, these members may only be accessed from the
94 // nsIInputStream implementation.
95 nsCOMPtr
<nsITransportEventSink
> mEventSink
;
96 nsCOMPtr
<nsIInputStream
> mSource
;
99 PRPackedBool mCloseWhenDone
;
100 PRPackedBool mFirstTime
;
102 // this variable serves as a lock to prevent the state of the transport
103 // from being modified once the copy is in progress.
104 PRPackedBool mInProgress
;
107 NS_IMPL_THREADSAFE_ISUPPORTS2(nsInputStreamTransport
,
114 nsInputStreamTransport::OpenInputStream(PRUint32 flags
,
117 nsIInputStream
**result
)
119 NS_ENSURE_TRUE(!mInProgress
, NS_ERROR_IN_PROGRESS
);
122 nsCOMPtr
<nsIEventTarget
> target
=
123 do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID
, &rv
);
124 if (NS_FAILED(rv
)) return rv
;
126 // XXX if the caller requests an unbuffered stream, then perhaps
127 // we'd want to simply return mSource; however, then we would
128 // not be reading mSource on a background thread. is this ok?
130 PRBool nonblocking
= !(flags
& OPEN_BLOCKING
);
132 net_ResolveSegmentParams(segsize
, segcount
);
133 nsIMemory
*segalloc
= net_GetSegmentAlloc(segsize
);
135 nsCOMPtr
<nsIAsyncOutputStream
> pipeOut
;
136 rv
= NS_NewPipe2(getter_AddRefs(mPipeIn
),
137 getter_AddRefs(pipeOut
),
138 nonblocking
, PR_TRUE
,
139 segsize
, segcount
, segalloc
);
140 if (NS_FAILED(rv
)) return rv
;
142 mInProgress
= PR_TRUE
;
144 // startup async copy process...
145 rv
= NS_AsyncCopy(this, pipeOut
, target
,
146 NS_ASYNCCOPY_VIA_WRITESEGMENTS
, segsize
);
147 if (NS_SUCCEEDED(rv
))
148 NS_ADDREF(*result
= mPipeIn
);
154 nsInputStreamTransport::OpenOutputStream(PRUint32 flags
,
157 nsIOutputStream
**result
)
159 // this transport only supports reading!
160 NS_NOTREACHED("nsInputStreamTransport::OpenOutputStream");
161 return NS_ERROR_UNEXPECTED
;
165 nsInputStreamTransport::Close(nsresult reason
)
167 if (NS_SUCCEEDED(reason
))
168 reason
= NS_BASE_STREAM_CLOSED
;
170 return mPipeIn
->CloseWithStatus(reason
);
174 nsInputStreamTransport::SetEventSink(nsITransportEventSink
*sink
,
175 nsIEventTarget
*target
)
177 NS_ENSURE_TRUE(!mInProgress
, NS_ERROR_IN_PROGRESS
);
180 return net_NewTransportEventSinkProxy(getter_AddRefs(mEventSink
),
187 /** nsIInputStream **/
190 nsInputStreamTransport::Close()
195 // make additional reads return early...
196 mOffset
= mLimit
= 0;
201 nsInputStreamTransport::Available(PRUint32
*result
)
203 return NS_ERROR_NOT_IMPLEMENTED
;
207 nsInputStreamTransport::Read(char *buf
, PRUint32 count
, PRUint32
*result
)
210 mFirstTime
= PR_FALSE
;
211 if (mOffset
!= nsUint64(0)) {
212 // read from current position if offset equal to max
213 if (mOffset
!= LL_MAXUINT
) {
214 nsCOMPtr
<nsISeekableStream
> seekable
= do_QueryInterface(mSource
);
215 // Note: The casts to PRUint64 are needed to cast to PRInt64, as
216 // nsUint64 can't directly be cast to PRInt64
218 seekable
->Seek(nsISeekableStream::NS_SEEK_SET
, PRUint64(mOffset
));
220 // reset offset to zero so we can use it to enforce limit
226 PRUint32 max
= mLimit
- mOffset
;
235 nsresult rv
= mSource
->Read(buf
, count
, result
);
237 if (NS_SUCCEEDED(rv
)) {
240 mEventSink
->OnTransportStatus(this, STATUS_READING
, mOffset
, mLimit
);
246 nsInputStreamTransport::ReadSegments(nsWriteSegmentFun writer
, void *closure
,
247 PRUint32 count
, PRUint32
*result
)
249 return NS_ERROR_NOT_IMPLEMENTED
;
253 nsInputStreamTransport::IsNonBlocking(PRBool
*result
)
259 //-----------------------------------------------------------------------------
260 // nsOutputStreamTransport
262 // Implements nsIOutputStream as a wrapper around the real input stream. This
263 // allows the transport to support seeking, range-limiting, progress reporting,
264 // and close-when-done semantics while utilizing NS_AsyncCopy.
265 //-----------------------------------------------------------------------------
267 class nsOutputStreamTransport
: public nsITransport
268 , public nsIOutputStream
273 NS_DECL_NSIOUTPUTSTREAM
275 nsOutputStreamTransport(nsIOutputStream
*sink
,
278 PRBool closeWhenDone
)
282 , mCloseWhenDone(closeWhenDone
)
283 , mFirstTime(PR_TRUE
)
284 , mInProgress(PR_FALSE
)
288 virtual ~nsOutputStreamTransport()
293 nsCOMPtr
<nsIAsyncOutputStream
> mPipeOut
;
295 // while the copy is active, these members may only be accessed from the
296 // nsIOutputStream implementation.
297 nsCOMPtr
<nsITransportEventSink
> mEventSink
;
298 nsCOMPtr
<nsIOutputStream
> mSink
;
301 PRPackedBool mCloseWhenDone
;
302 PRPackedBool mFirstTime
;
304 // this variable serves as a lock to prevent the state of the transport
305 // from being modified once the copy is in progress.
306 PRPackedBool mInProgress
;
309 NS_IMPL_THREADSAFE_ISUPPORTS2(nsOutputStreamTransport
,
316 nsOutputStreamTransport::OpenInputStream(PRUint32 flags
,
319 nsIInputStream
**result
)
321 // this transport only supports writing!
322 NS_NOTREACHED("nsOutputStreamTransport::OpenInputStream");
323 return NS_ERROR_UNEXPECTED
;
327 nsOutputStreamTransport::OpenOutputStream(PRUint32 flags
,
330 nsIOutputStream
**result
)
332 NS_ENSURE_TRUE(!mInProgress
, NS_ERROR_IN_PROGRESS
);
335 nsCOMPtr
<nsIEventTarget
> target
=
336 do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID
, &rv
);
337 if (NS_FAILED(rv
)) return rv
;
339 // XXX if the caller requests an unbuffered stream, then perhaps
340 // we'd want to simply return mSink; however, then we would
341 // not be writing to mSink on a background thread. is this ok?
343 PRBool nonblocking
= !(flags
& OPEN_BLOCKING
);
345 net_ResolveSegmentParams(segsize
, segcount
);
346 nsIMemory
*segalloc
= net_GetSegmentAlloc(segsize
);
348 nsCOMPtr
<nsIAsyncInputStream
> pipeIn
;
349 rv
= NS_NewPipe2(getter_AddRefs(pipeIn
),
350 getter_AddRefs(mPipeOut
),
351 PR_TRUE
, nonblocking
,
352 segsize
, segcount
, segalloc
);
353 if (NS_FAILED(rv
)) return rv
;
355 mInProgress
= PR_TRUE
;
357 // startup async copy process...
358 rv
= NS_AsyncCopy(pipeIn
, this, target
,
359 NS_ASYNCCOPY_VIA_READSEGMENTS
, segsize
);
360 if (NS_SUCCEEDED(rv
))
361 NS_ADDREF(*result
= mPipeOut
);
367 nsOutputStreamTransport::Close(nsresult reason
)
369 if (NS_SUCCEEDED(reason
))
370 reason
= NS_BASE_STREAM_CLOSED
;
372 return mPipeOut
->CloseWithStatus(reason
);
376 nsOutputStreamTransport::SetEventSink(nsITransportEventSink
*sink
,
377 nsIEventTarget
*target
)
379 NS_ENSURE_TRUE(!mInProgress
, NS_ERROR_IN_PROGRESS
);
382 return net_NewTransportEventSinkProxy(getter_AddRefs(mEventSink
),
389 /** nsIOutputStream **/
392 nsOutputStreamTransport::Close()
397 // make additional writes return early...
398 mOffset
= mLimit
= 0;
403 nsOutputStreamTransport::Flush()
409 nsOutputStreamTransport::Write(const char *buf
, PRUint32 count
, PRUint32
*result
)
412 mFirstTime
= PR_FALSE
;
413 if (mOffset
!= nsUint64(0)) {
414 // write to current position if offset equal to max
415 if (mOffset
!= LL_MAXUINT
) {
416 nsCOMPtr
<nsISeekableStream
> seekable
= do_QueryInterface(mSink
);
417 // Note: The casts to PRUint64 are needed to cast to PRInt64, as
418 // nsUint64 can't directly be cast to PRInt64
420 seekable
->Seek(nsISeekableStream::NS_SEEK_SET
, PRUint64(mOffset
));
422 // reset offset to zero so we can use it to enforce limit
427 // limit amount written
428 PRUint32 max
= mLimit
- mOffset
;
437 nsresult rv
= mSink
->Write(buf
, count
, result
);
439 if (NS_SUCCEEDED(rv
)) {
442 mEventSink
->OnTransportStatus(this, STATUS_WRITING
, mOffset
, mLimit
);
448 nsOutputStreamTransport::WriteSegments(nsReadSegmentFun reader
, void *closure
,
449 PRUint32 count
, PRUint32
*result
)
451 return NS_ERROR_NOT_IMPLEMENTED
;
455 nsOutputStreamTransport::WriteFrom(nsIInputStream
*in
, PRUint32 count
, PRUint32
*result
)
457 return NS_ERROR_NOT_IMPLEMENTED
;
461 nsOutputStreamTransport::IsNonBlocking(PRBool
*result
)
467 //-----------------------------------------------------------------------------
468 // nsStreamTransportService
469 //-----------------------------------------------------------------------------
471 nsStreamTransportService::~nsStreamTransportService()
473 NS_ASSERTION(!mPool
, "thread pool wasn't shutdown");
477 nsStreamTransportService::Init()
479 mPool
= do_CreateInstance(NS_THREADPOOL_CONTRACTID
);
480 NS_ENSURE_STATE(mPool
);
482 // Configure the pool
483 mPool
->SetThreadLimit(4);
484 mPool
->SetIdleThreadLimit(1);
485 mPool
->SetIdleThreadTimeout(PR_SecondsToInterval(60));
487 nsCOMPtr
<nsIObserverService
> obsSvc
=
488 do_GetService("@mozilla.org/observer-service;1");
490 obsSvc
->AddObserver(this, "xpcom-shutdown-threads", PR_FALSE
);
494 NS_IMPL_THREADSAFE_ISUPPORTS3(nsStreamTransportService
,
495 nsIStreamTransportService
,
500 nsStreamTransportService::Dispatch(nsIRunnable
*task
, PRUint32 flags
)
502 NS_ENSURE_TRUE(mPool
, NS_ERROR_NOT_INITIALIZED
);
503 return mPool
->Dispatch(task
, flags
);
507 nsStreamTransportService::IsOnCurrentThread(PRBool
*result
)
509 NS_ENSURE_TRUE(mPool
, NS_ERROR_NOT_INITIALIZED
);
510 return mPool
->IsOnCurrentThread(result
);
514 nsStreamTransportService::CreateInputTransport(nsIInputStream
*stream
,
517 PRBool closeWhenDone
,
518 nsITransport
**result
)
520 nsInputStreamTransport
*trans
=
521 new nsInputStreamTransport(stream
, offset
, limit
, closeWhenDone
);
523 return NS_ERROR_OUT_OF_MEMORY
;
524 NS_ADDREF(*result
= trans
);
529 nsStreamTransportService::CreateOutputTransport(nsIOutputStream
*stream
,
532 PRBool closeWhenDone
,
533 nsITransport
**result
)
535 nsOutputStreamTransport
*trans
=
536 new nsOutputStreamTransport(stream
, offset
, limit
, closeWhenDone
);
538 return NS_ERROR_OUT_OF_MEMORY
;
539 NS_ADDREF(*result
= trans
);
544 nsStreamTransportService::Observe(nsISupports
*subject
, const char *topic
,
545 const PRUnichar
*data
)
547 NS_ASSERTION(strcmp(topic
, "xpcom-shutdown-threads") == 0, "oops");