Fix typo in 9b54bd30006c008b4a951331b273613d5bac3abf
[pm.git] / xpcom / io / nsMultiplexInputStream.cpp
blob960e210337228b3711859dcb41fedf0402c26304
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 /**
8 * The multiplex stream concatenates a list of input streams into a single
9 * stream.
12 #include "mozilla/Attributes.h"
13 #include "mozilla/MathAlgorithms.h"
15 #include "base/basictypes.h"
17 #include "nsMultiplexInputStream.h"
18 #include "nsIMultiplexInputStream.h"
19 #include "nsISeekableStream.h"
20 #include "nsCOMPtr.h"
21 #include "nsCOMArray.h"
22 #include "nsIClassInfoImpl.h"
23 #include "nsIIPCSerializableInputStream.h"
24 #include "mozilla/ipc/InputStreamUtils.h"
26 using namespace mozilla::ipc;
28 using mozilla::DeprecatedAbs;
30 class nsMultiplexInputStream final
31 : public nsIMultiplexInputStream
32 , public nsISeekableStream
33 , public nsIIPCSerializableInputStream
35 public:
36 nsMultiplexInputStream();
38 NS_DECL_THREADSAFE_ISUPPORTS
39 NS_DECL_NSIINPUTSTREAM
40 NS_DECL_NSIMULTIPLEXINPUTSTREAM
41 NS_DECL_NSISEEKABLESTREAM
42 NS_DECL_NSIIPCSERIALIZABLEINPUTSTREAM
44 private:
45 ~nsMultiplexInputStream()
49 struct MOZ_STACK_CLASS ReadSegmentsState
51 nsCOMPtr<nsIInputStream> mThisStream;
52 uint32_t mOffset;
53 nsWriteSegmentFun mWriter;
54 void* mClosure;
55 bool mDone;
58 static NS_METHOD ReadSegCb(nsIInputStream* aIn, void* aClosure,
59 const char* aFromRawSegment, uint32_t aToOffset,
60 uint32_t aCount, uint32_t* aWriteCount);
62 nsTArray<nsCOMPtr<nsIInputStream>> mStreams;
63 uint32_t mCurrentStream;
64 bool mStartedReadingCurrent;
65 nsresult mStatus;
68 NS_IMPL_ADDREF(nsMultiplexInputStream)
69 NS_IMPL_RELEASE(nsMultiplexInputStream)
71 NS_IMPL_CLASSINFO(nsMultiplexInputStream, nullptr, nsIClassInfo::THREADSAFE,
72 NS_MULTIPLEXINPUTSTREAM_CID)
74 NS_IMPL_QUERY_INTERFACE_CI(nsMultiplexInputStream,
75 nsIMultiplexInputStream,
76 nsIInputStream,
77 nsISeekableStream,
78 nsIIPCSerializableInputStream)
79 NS_IMPL_CI_INTERFACE_GETTER(nsMultiplexInputStream,
80 nsIMultiplexInputStream,
81 nsIInputStream,
82 nsISeekableStream)
84 static nsresult
85 AvailableMaybeSeek(nsIInputStream* aStream, uint64_t* aResult)
87 nsresult rv = aStream->Available(aResult);
88 if (rv == NS_BASE_STREAM_CLOSED) {
89 // Blindly seek to the current position if Available() returns
90 // NS_BASE_STREAM_CLOSED.
91 // If nsIFileInputStream is closed in Read() due to CLOSE_ON_EOF flag,
92 // Seek() could reopen the file if REOPEN_ON_REWIND flag is set.
93 nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(aStream);
94 if (seekable) {
95 nsresult rv = seekable->Seek(nsISeekableStream::NS_SEEK_CUR, 0);
96 if (NS_SUCCEEDED(rv)) {
97 rv = aStream->Available(aResult);
101 return rv;
104 static nsresult
105 TellMaybeSeek(nsISeekableStream* aSeekable, int64_t* aResult)
107 nsresult rv = aSeekable->Tell(aResult);
108 if (rv == NS_BASE_STREAM_CLOSED) {
109 // Blindly seek to the current position if Tell() returns
110 // NS_BASE_STREAM_CLOSED.
111 // If nsIFileInputStream is closed in Read() due to CLOSE_ON_EOF flag,
112 // Seek() could reopen the file if REOPEN_ON_REWIND flag is set.
113 nsresult rv = aSeekable->Seek(nsISeekableStream::NS_SEEK_CUR, 0);
114 if (NS_SUCCEEDED(rv)) {
115 rv = aSeekable->Tell(aResult);
118 return rv;
121 nsMultiplexInputStream::nsMultiplexInputStream()
122 : mCurrentStream(0),
123 mStartedReadingCurrent(false),
124 mStatus(NS_OK)
128 /* readonly attribute unsigned long count; */
129 NS_IMETHODIMP
130 nsMultiplexInputStream::GetCount(uint32_t* aCount)
132 *aCount = mStreams.Length();
133 return NS_OK;
136 #ifdef DEBUG
137 static bool
138 SeekableStreamAtBeginning(nsIInputStream* aStream)
140 int64_t streamPos;
141 nsCOMPtr<nsISeekableStream> stream = do_QueryInterface(aStream);
142 if (stream && NS_SUCCEEDED(stream->Tell(&streamPos)) && streamPos != 0) {
143 return false;
145 return true;
147 #endif
149 /* void appendStream (in nsIInputStream stream); */
150 NS_IMETHODIMP
151 nsMultiplexInputStream::AppendStream(nsIInputStream* aStream)
153 NS_ASSERTION(SeekableStreamAtBeginning(aStream),
154 "Appended stream not at beginning.");
155 return mStreams.AppendElement(aStream) ? NS_OK : NS_ERROR_OUT_OF_MEMORY;
158 /* void insertStream (in nsIInputStream stream, in unsigned long index); */
159 NS_IMETHODIMP
160 nsMultiplexInputStream::InsertStream(nsIInputStream* aStream, uint32_t aIndex)
162 NS_ASSERTION(SeekableStreamAtBeginning(aStream),
163 "Inserted stream not at beginning.");
164 mStreams.InsertElementAt(aIndex, aStream);
165 if (mCurrentStream > aIndex ||
166 (mCurrentStream == aIndex && mStartedReadingCurrent)) {
167 ++mCurrentStream;
169 return NS_OK;
172 /* void removeStream (in unsigned long index); */
173 NS_IMETHODIMP
174 nsMultiplexInputStream::RemoveStream(uint32_t aIndex)
176 mStreams.RemoveElementAt(aIndex);
177 if (mCurrentStream > aIndex) {
178 --mCurrentStream;
179 } else if (mCurrentStream == aIndex) {
180 mStartedReadingCurrent = false;
183 return NS_OK;
186 /* nsIInputStream getStream (in unsigned long index); */
187 NS_IMETHODIMP
188 nsMultiplexInputStream::GetStream(uint32_t aIndex, nsIInputStream** aResult)
190 *aResult = mStreams.SafeElementAt(aIndex, nullptr);
191 if (NS_WARN_IF(!*aResult)) {
192 return NS_ERROR_NOT_AVAILABLE;
195 NS_ADDREF(*aResult);
196 return NS_OK;
199 /* void close (); */
200 NS_IMETHODIMP
201 nsMultiplexInputStream::Close()
203 mStatus = NS_BASE_STREAM_CLOSED;
205 nsresult rv = NS_OK;
207 uint32_t len = mStreams.Length();
208 for (uint32_t i = 0; i < len; ++i) {
209 nsresult rv2 = mStreams[i]->Close();
210 // We still want to close all streams, but we should return an error
211 if (NS_FAILED(rv2)) {
212 rv = rv2;
215 return rv;
218 /* unsigned long long available (); */
219 NS_IMETHODIMP
220 nsMultiplexInputStream::Available(uint64_t* aResult)
222 if (NS_FAILED(mStatus)) {
223 return mStatus;
226 nsresult rv;
227 uint64_t avail = 0;
229 uint32_t len = mStreams.Length();
230 for (uint32_t i = mCurrentStream; i < len; i++) {
231 uint64_t streamAvail;
232 rv = AvailableMaybeSeek(mStreams[i], &streamAvail);
233 if (NS_WARN_IF(NS_FAILED(rv))) {
234 return rv;
236 avail += streamAvail;
238 *aResult = avail;
239 return NS_OK;
242 /* [noscript] unsigned long read (in charPtr buf, in unsigned long count); */
243 NS_IMETHODIMP
244 nsMultiplexInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult)
246 // It is tempting to implement this method in terms of ReadSegments, but
247 // that would prevent this class from being used with streams that only
248 // implement Read (e.g., file streams).
250 *aResult = 0;
252 if (mStatus == NS_BASE_STREAM_CLOSED) {
253 return NS_OK;
255 if (NS_FAILED(mStatus)) {
256 return mStatus;
259 nsresult rv = NS_OK;
261 uint32_t len = mStreams.Length();
262 while (mCurrentStream < len && aCount) {
263 uint32_t read;
264 rv = mStreams[mCurrentStream]->Read(aBuf, aCount, &read);
266 // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
267 // (This is a bug in those stream implementations)
268 if (rv == NS_BASE_STREAM_CLOSED) {
269 NS_NOTREACHED("Input stream's Read method returned NS_BASE_STREAM_CLOSED");
270 rv = NS_OK;
271 read = 0;
272 } else if (NS_FAILED(rv)) {
273 break;
276 if (read == 0) {
277 ++mCurrentStream;
278 mStartedReadingCurrent = false;
279 } else {
280 NS_ASSERTION(aCount >= read, "Read more than requested");
281 *aResult += read;
282 aCount -= read;
283 aBuf += read;
284 mStartedReadingCurrent = true;
287 return *aResult ? NS_OK : rv;
290 /* [noscript] unsigned long readSegments (in nsWriteSegmentFun writer,
291 * in voidPtr closure,
292 * in unsigned long count); */
293 NS_IMETHODIMP
294 nsMultiplexInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
295 uint32_t aCount, uint32_t* aResult)
297 if (mStatus == NS_BASE_STREAM_CLOSED) {
298 *aResult = 0;
299 return NS_OK;
301 if (NS_FAILED(mStatus)) {
302 return mStatus;
305 NS_ASSERTION(aWriter, "missing aWriter");
307 nsresult rv = NS_OK;
308 ReadSegmentsState state;
309 state.mThisStream = this;
310 state.mOffset = 0;
311 state.mWriter = aWriter;
312 state.mClosure = aClosure;
313 state.mDone = false;
315 uint32_t len = mStreams.Length();
316 while (mCurrentStream < len && aCount) {
317 uint32_t read;
318 rv = mStreams[mCurrentStream]->ReadSegments(ReadSegCb, &state, aCount, &read);
320 // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
321 // (This is a bug in those stream implementations)
322 if (rv == NS_BASE_STREAM_CLOSED) {
323 NS_NOTREACHED("Input stream's Read method returned NS_BASE_STREAM_CLOSED");
324 rv = NS_OK;
325 read = 0;
328 // if |aWriter| decided to stop reading segments...
329 if (state.mDone || NS_FAILED(rv)) {
330 break;
333 // if stream is empty, then advance to the next stream.
334 if (read == 0) {
335 ++mCurrentStream;
336 mStartedReadingCurrent = false;
337 } else {
338 NS_ASSERTION(aCount >= read, "Read more than requested");
339 state.mOffset += read;
340 aCount -= read;
341 mStartedReadingCurrent = true;
345 // if we successfully read some data, then this call succeeded.
346 *aResult = state.mOffset;
347 return state.mOffset ? NS_OK : rv;
350 NS_METHOD
351 nsMultiplexInputStream::ReadSegCb(nsIInputStream* aIn, void* aClosure,
352 const char* aFromRawSegment,
353 uint32_t aToOffset, uint32_t aCount,
354 uint32_t* aWriteCount)
356 nsresult rv;
357 ReadSegmentsState* state = (ReadSegmentsState*)aClosure;
358 rv = (state->mWriter)(state->mThisStream,
359 state->mClosure,
360 aFromRawSegment,
361 aToOffset + state->mOffset,
362 aCount,
363 aWriteCount);
364 if (NS_FAILED(rv)) {
365 state->mDone = true;
367 return rv;
370 /* readonly attribute boolean nonBlocking; */
371 NS_IMETHODIMP
372 nsMultiplexInputStream::IsNonBlocking(bool* aNonBlocking)
374 uint32_t len = mStreams.Length();
375 if (len == 0) {
376 // Claim to be non-blocking, since we won't block the caller.
377 // On the other hand we'll never return NS_BASE_STREAM_WOULD_BLOCK,
378 // so maybe we should claim to be blocking? It probably doesn't
379 // matter in practice.
380 *aNonBlocking = true;
381 return NS_OK;
383 for (uint32_t i = 0; i < len; ++i) {
384 nsresult rv = mStreams[i]->IsNonBlocking(aNonBlocking);
385 if (NS_WARN_IF(NS_FAILED(rv))) {
386 return rv;
388 // If one is non-blocking the entire stream becomes non-blocking
389 // (except that we don't implement nsIAsyncInputStream, so there's
390 // not much for the caller to do if Read returns "would block")
391 if (*aNonBlocking) {
392 return NS_OK;
395 return NS_OK;
398 /* void seek (in int32_t whence, in int32_t offset); */
399 NS_IMETHODIMP
400 nsMultiplexInputStream::Seek(int32_t aWhence, int64_t aOffset)
402 if (NS_FAILED(mStatus)) {
403 return mStatus;
406 nsresult rv;
408 uint32_t oldCurrentStream = mCurrentStream;
409 bool oldStartedReadingCurrent = mStartedReadingCurrent;
411 if (aWhence == NS_SEEK_SET) {
412 int64_t remaining = aOffset;
413 if (aOffset == 0) {
414 mCurrentStream = 0;
416 for (uint32_t i = 0; i < mStreams.Length(); ++i) {
417 nsCOMPtr<nsISeekableStream> stream =
418 do_QueryInterface(mStreams[i]);
419 if (!stream) {
420 return NS_ERROR_FAILURE;
423 // See if all remaining streams should be rewound
424 if (remaining == 0) {
425 if (i < oldCurrentStream ||
426 (i == oldCurrentStream && oldStartedReadingCurrent)) {
427 rv = stream->Seek(NS_SEEK_SET, 0);
428 if (NS_WARN_IF(NS_FAILED(rv))) {
429 return rv;
431 continue;
432 } else {
433 break;
437 // Get position in current stream
438 int64_t streamPos;
439 if (i > oldCurrentStream ||
440 (i == oldCurrentStream && !oldStartedReadingCurrent)) {
441 streamPos = 0;
442 } else {
443 rv = TellMaybeSeek(stream, &streamPos);
444 if (NS_WARN_IF(NS_FAILED(rv))) {
445 return rv;
449 // See if we need to seek current stream forward or backward
450 if (remaining < streamPos) {
451 rv = stream->Seek(NS_SEEK_SET, remaining);
452 if (NS_WARN_IF(NS_FAILED(rv))) {
453 return rv;
456 mCurrentStream = i;
457 mStartedReadingCurrent = remaining != 0;
459 remaining = 0;
460 } else if (remaining > streamPos) {
461 if (i < oldCurrentStream) {
462 // We're already at end so no need to seek this stream
463 remaining -= streamPos;
464 NS_ASSERTION(remaining >= 0, "Remaining invalid");
465 } else {
466 uint64_t avail;
467 rv = AvailableMaybeSeek(mStreams[i], &avail);
468 if (NS_WARN_IF(NS_FAILED(rv))) {
469 return rv;
472 int64_t newPos = XPCOM_MIN(remaining, streamPos + (int64_t)avail);
474 rv = stream->Seek(NS_SEEK_SET, newPos);
475 if (NS_WARN_IF(NS_FAILED(rv))) {
476 return rv;
479 mCurrentStream = i;
480 mStartedReadingCurrent = true;
482 remaining -= newPos;
483 NS_ASSERTION(remaining >= 0, "Remaining invalid");
485 } else {
486 NS_ASSERTION(remaining == streamPos, "Huh?");
487 remaining = 0;
491 return NS_OK;
494 if (aWhence == NS_SEEK_CUR && aOffset > 0) {
495 int64_t remaining = aOffset;
496 for (uint32_t i = mCurrentStream; remaining && i < mStreams.Length(); ++i) {
497 nsCOMPtr<nsISeekableStream> stream =
498 do_QueryInterface(mStreams[i]);
500 uint64_t avail;
501 rv = AvailableMaybeSeek(mStreams[i], &avail);
502 if (NS_WARN_IF(NS_FAILED(rv))) {
503 return rv;
506 int64_t seek = XPCOM_MIN((int64_t)avail, remaining);
508 rv = stream->Seek(NS_SEEK_CUR, seek);
509 if (NS_WARN_IF(NS_FAILED(rv))) {
510 return rv;
513 mCurrentStream = i;
514 mStartedReadingCurrent = true;
516 remaining -= seek;
519 return NS_OK;
522 if (aWhence == NS_SEEK_CUR && aOffset < 0) {
523 int64_t remaining = -aOffset;
524 for (uint32_t i = mCurrentStream; remaining && i != (uint32_t)-1; --i) {
525 nsCOMPtr<nsISeekableStream> stream =
526 do_QueryInterface(mStreams[i]);
528 int64_t pos;
529 rv = TellMaybeSeek(stream, &pos);
530 if (NS_WARN_IF(NS_FAILED(rv))) {
531 return rv;
534 int64_t seek = XPCOM_MIN(pos, remaining);
536 rv = stream->Seek(NS_SEEK_CUR, -seek);
537 if (NS_WARN_IF(NS_FAILED(rv))) {
538 return rv;
541 mCurrentStream = i;
542 mStartedReadingCurrent = seek != -pos;
544 remaining -= seek;
547 return NS_OK;
550 if (aWhence == NS_SEEK_CUR) {
551 NS_ASSERTION(aOffset == 0, "Should have handled all non-zero values");
553 return NS_OK;
556 if (aWhence == NS_SEEK_END) {
557 if (aOffset > 0) {
558 return NS_ERROR_INVALID_ARG;
560 int64_t remaining = aOffset;
561 for (uint32_t i = mStreams.Length() - 1; i != (uint32_t)-1; --i) {
562 nsCOMPtr<nsISeekableStream> stream =
563 do_QueryInterface(mStreams[i]);
565 // See if all remaining streams should be seeked to end
566 if (remaining == 0) {
567 if (i >= oldCurrentStream) {
568 rv = stream->Seek(NS_SEEK_END, 0);
569 if (NS_WARN_IF(NS_FAILED(rv))) {
570 return rv;
572 } else {
573 break;
577 // Get position in current stream
578 int64_t streamPos;
579 if (i < oldCurrentStream) {
580 streamPos = 0;
581 } else {
582 uint64_t avail;
583 rv = AvailableMaybeSeek(mStreams[i], &avail);
584 if (NS_WARN_IF(NS_FAILED(rv))) {
585 return rv;
588 streamPos = avail;
591 // See if we have enough data in the current stream.
592 if (DeprecatedAbs(remaining) < streamPos) {
593 rv = stream->Seek(NS_SEEK_END, remaining);
594 if (NS_WARN_IF(NS_FAILED(rv))) {
595 return rv;
598 mCurrentStream = i;
599 mStartedReadingCurrent = true;
601 remaining = 0;
602 } else if (DeprecatedAbs(remaining) > streamPos) {
603 if (i > oldCurrentStream ||
604 (i == oldCurrentStream && !oldStartedReadingCurrent)) {
605 // We're already at start so no need to seek this stream
606 remaining += streamPos;
607 } else {
608 int64_t avail;
609 rv = TellMaybeSeek(stream, &avail);
610 if (NS_WARN_IF(NS_FAILED(rv))) {
611 return rv;
614 int64_t newPos = streamPos + XPCOM_MIN(avail, DeprecatedAbs(remaining));
616 rv = stream->Seek(NS_SEEK_END, -newPos);
617 if (NS_WARN_IF(NS_FAILED(rv))) {
618 return rv;
621 mCurrentStream = i;
622 mStartedReadingCurrent = true;
624 remaining += newPos;
626 } else {
627 NS_ASSERTION(remaining == streamPos, "Huh?");
628 remaining = 0;
632 return NS_OK;
635 // other Seeks not implemented yet
636 return NS_ERROR_NOT_IMPLEMENTED;
639 /* uint32_t tell (); */
640 NS_IMETHODIMP
641 nsMultiplexInputStream::Tell(int64_t* aResult)
643 if (NS_FAILED(mStatus)) {
644 return mStatus;
647 nsresult rv;
648 int64_t ret64 = 0;
649 uint32_t i, last;
650 last = mStartedReadingCurrent ? mCurrentStream + 1 : mCurrentStream;
651 for (i = 0; i < last; ++i) {
652 nsCOMPtr<nsISeekableStream> stream = do_QueryInterface(mStreams[i]);
653 if (NS_WARN_IF(!stream)) {
654 return NS_ERROR_NO_INTERFACE;
657 int64_t pos;
658 rv = TellMaybeSeek(stream, &pos);
659 if (NS_WARN_IF(NS_FAILED(rv))) {
660 return rv;
662 ret64 += pos;
664 *aResult = ret64;
666 return NS_OK;
669 /* void setEOF (); */
670 NS_IMETHODIMP
671 nsMultiplexInputStream::SetEOF()
673 return NS_ERROR_NOT_IMPLEMENTED;
676 nsresult
677 nsMultiplexInputStreamConstructor(nsISupports* aOuter,
678 REFNSIID aIID,
679 void** aResult)
681 *aResult = nullptr;
683 if (aOuter) {
684 return NS_ERROR_NO_AGGREGATION;
687 nsMultiplexInputStream* inst = new nsMultiplexInputStream();
688 if (!inst) {
689 return NS_ERROR_OUT_OF_MEMORY;
692 NS_ADDREF(inst);
693 nsresult rv = inst->QueryInterface(aIID, aResult);
694 NS_RELEASE(inst);
696 return rv;
699 void
700 nsMultiplexInputStream::Serialize(InputStreamParams& aParams,
701 FileDescriptorArray& aFileDescriptors)
703 MultiplexInputStreamParams params;
705 uint32_t streamCount = mStreams.Length();
707 if (streamCount) {
708 InfallibleTArray<InputStreamParams>& streams = params.streams();
710 streams.SetCapacity(streamCount);
711 for (uint32_t index = 0; index < streamCount; index++) {
712 InputStreamParams childStreamParams;
713 SerializeInputStream(mStreams[index], childStreamParams,
714 aFileDescriptors);
716 streams.AppendElement(childStreamParams);
720 params.currentStream() = mCurrentStream;
721 params.status() = mStatus;
722 params.startedReadingCurrent() = mStartedReadingCurrent;
724 aParams = params;
727 bool
728 nsMultiplexInputStream::Deserialize(const InputStreamParams& aParams,
729 const FileDescriptorArray& aFileDescriptors)
731 if (aParams.type() !=
732 InputStreamParams::TMultiplexInputStreamParams) {
733 NS_ERROR("Received unknown parameters from the other process!");
734 return false;
737 const MultiplexInputStreamParams& params =
738 aParams.get_MultiplexInputStreamParams();
740 const InfallibleTArray<InputStreamParams>& streams = params.streams();
742 uint32_t streamCount = streams.Length();
743 for (uint32_t index = 0; index < streamCount; index++) {
744 nsCOMPtr<nsIInputStream> stream =
745 DeserializeInputStream(streams[index], aFileDescriptors);
746 if (!stream) {
747 NS_WARNING("Deserialize failed!");
748 return false;
751 if (NS_FAILED(AppendStream(stream))) {
752 NS_WARNING("AppendStream failed!");
753 return false;
757 mCurrentStream = params.currentStream();
758 mStatus = params.status();
759 mStartedReadingCurrent = params.startedReadingCurrent();
761 return true;