1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 const DevToolsUtils
= require("resource://devtools/shared/DevToolsUtils.js");
8 const { dumpv
} = DevToolsUtils
;
9 const EventEmitter
= require("resource://devtools/shared/event-emitter.js");
11 DevToolsUtils
.defineLazyGetter(this, "IOUtil", () => {
12 return Cc
["@mozilla.org/io-util;1"].getService(Ci
.nsIIOUtil
);
15 DevToolsUtils
.defineLazyGetter(this, "ScriptableInputStream", () => {
16 return Components
.Constructor(
17 "@mozilla.org/scriptableinputstream;1",
18 "nsIScriptableInputStream",
23 const BUFFER_SIZE
= 0x8000;
26 * This helper function (and its companion object) are used by bulk senders and
27 * receivers to read and write data in and out of other streams. Functions that
28 * make use of this tool are passed to callers when it is time to read or write
29 * bulk data. It is highly recommended to use these copier functions instead of
30 * the stream directly because the copier enforces the agreed upon length.
31 * Since bulk mode reuses an existing stream, the sender and receiver must write
32 * and read exactly the agreed upon amount of data, or else the entire transport
33 * will be left in a invalid state. Additionally, other methods of stream
34 * copying (such as NetUtil.asyncCopy) close the streams involved, which would
35 * terminate the debugging transport, and so it is avoided here.
37 * Overall, this *works*, but clearly the optimal solution would be able to just
38 * use the streams directly. If it were possible to fully implement
39 * nsIInputStream / nsIOutputStream in JS, wrapper streams could be created to
40 * enforce the length and avoid closing, and consumers could use familiar stream
41 * utilities like NetUtil.asyncCopy.
43 * The function takes two async streams and copies a precise number of bytes
44 * from one to the other. Copying begins immediately, but may complete at some
45 * future time depending on data size. Use the returned promise to know when
48 * @param input nsIAsyncInputStream
49 * The stream to copy from.
50 * @param output nsIAsyncOutputStream
51 * The stream to copy to.
52 * @param length Integer
53 * The amount of data that needs to be copied.
55 * The promise is resolved when copying completes or rejected if any
56 * (unexpected) errors occur.
58 function copyStream(input
, output
, length
) {
59 const copier
= new StreamCopier(input
, output
, length
);
63 function StreamCopier(input
, output
, length
) {
64 EventEmitter
.decorate(this);
65 this._id
= StreamCopier
._nextId
++;
67 // Save off the base output stream, since we know it's async as we've required
68 this.baseAsyncOutput
= output
;
69 if (IOUtil
.outputStreamIsBuffered(output
)) {
73 "@mozilla.org/network/buffered-output-stream;1"
74 ].createInstance(Ci
.nsIBufferedOutputStream
);
75 this.output
.init(output
, BUFFER_SIZE
);
77 this._length
= length
;
78 this._amountLeft
= length
;
81 this._deferred
= new Promise((resolve
, reject
) => {
85 this._deferred
.resolve
= _resolve
;
86 this._deferred
.reject
= _reject
;
88 this._copy
= this._copy
.bind(this);
89 this._flush
= this._flush
.bind(this);
90 this._destroy
= this._destroy
.bind(this);
92 // Copy promise's then method up to this object.
93 // Allows the copier to offer a promise interface for the simple succeed or
94 // fail scenarios, but also emit events (due to the EventEmitter) for other
95 // states, like progress.
96 this.then
= this._deferred
.then
.bind(this._deferred
);
97 this.then(this._destroy
, this._destroy
);
99 // Stream ready callback starts as |_copy|, but may switch to |_flush| at end
100 // if flushing would block the output stream.
101 this._streamReadyCallback
= this._copy
;
103 StreamCopier
._nextId
= 0;
105 StreamCopier
.prototype = {
107 // Dispatch to the next tick so that it's possible to attach a progress
108 // event listener, even for extremely fast copies (like when testing).
109 Services
.tm
.dispatchToMainThread(() => {
113 this._deferred
.reject(e
);
120 const bytesAvailable
= this.input
.available();
121 const amountToCopy
= Math
.min(bytesAvailable
, this._amountLeft
);
122 this._debug("Trying to copy: " + amountToCopy
);
126 bytesCopied
= this.output
.writeFrom(this.input
, amountToCopy
);
128 if (e
.result
== Cr
.NS_BASE_STREAM_WOULD_BLOCK
) {
129 this._debug("Base stream would block, will retry");
130 this._debug("Waiting for output stream");
131 this.baseAsyncOutput
.asyncWait(this, 0, 0, Services
.tm
.currentThread
);
137 this._amountLeft
-= bytesCopied
;
138 this._debug("Copied: " + bytesCopied
+ ", Left: " + this._amountLeft
);
139 this._emitProgress();
141 if (this._amountLeft
=== 0) {
142 this._debug("Copy done!");
147 this._debug("Waiting for input stream");
148 this.input
.asyncWait(this, 0, 0, Services
.tm
.currentThread
);
152 this.emit("progress", {
153 bytesSent
: this._length
- this._amountLeft
,
154 totalBytes
: this._length
,
163 e
.result
== Cr
.NS_BASE_STREAM_WOULD_BLOCK
||
164 e
.result
== Cr
.NS_ERROR_FAILURE
166 this._debug("Flush would block, will retry");
167 this._streamReadyCallback
= this._flush
;
168 this._debug("Waiting for output stream");
169 this.baseAsyncOutput
.asyncWait(this, 0, 0, Services
.tm
.currentThread
);
174 this._deferred
.resolve();
178 this._destroy
= null;
185 // nsIInputStreamCallback
186 onInputStreamReady() {
187 this._streamReadyCallback();
190 // nsIOutputStreamCallback
191 onOutputStreamReady() {
192 this._streamReadyCallback();
196 // Prefix logs with the copier ID, which makes logs much easier to
197 // understand when several copiers are running simultaneously
198 dumpv("Copier: " + this._id
+ " " + msg
);
203 * Read from a stream, one byte at a time, up to the next |delimiter|
204 * character, but stopping if we've read |count| without finding it. Reading
205 * also terminates early if there are less than |count| bytes available on the
206 * stream. In that case, we only read as many bytes as the stream currently has
208 * TODO: This implementation could be removed if bug 984651 is fixed, which
209 * provides a native version of the same idea.
210 * @param stream nsIInputStream
211 * The input stream to read from.
212 * @param delimiter string
213 * The character we're trying to find.
214 * @param count integer
215 * The max number of characters to read while searching.
217 * The data collected. If the delimiter was found, this string will
220 function delimitedRead(stream
, delimiter
, count
) {
222 "Starting delimited read for " + delimiter
+ " up to " + count
+ " bytes"
225 let scriptableStream
;
226 if (stream
instanceof Ci
.nsIScriptableInputStream
) {
227 scriptableStream
= stream
;
229 scriptableStream
= new ScriptableInputStream(stream
);
234 // Don't exceed what's available on the stream
235 count
= Math
.min(count
, stream
.available());
242 while (char !== delimiter
&& count
> 0) {
243 char = scriptableStream
.readBytes(1);