Backed out changeset b71c8c052463 (bug 1943846) for causing mass failures. CLOSED...
[gecko.git] / devtools / shared / transport / stream-utils.js
bloba3efd1d2aacec4ccfb62b9e106ca864b6ecdbfb0
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 "use strict";
7 const DevToolsUtils = require("resource://devtools/shared/DevToolsUtils.js");
8 const { dumpv } = DevToolsUtils;
9 const EventEmitter = require("resource://devtools/shared/event-emitter.js");
11 DevToolsUtils.defineLazyGetter(this, "IOUtil", () => {
12 return Cc["@mozilla.org/io-util;1"].getService(Ci.nsIIOUtil);
13 });
15 DevToolsUtils.defineLazyGetter(this, "ScriptableInputStream", () => {
16 return Components.Constructor(
17 "@mozilla.org/scriptableinputstream;1",
18 "nsIScriptableInputStream",
19 "init"
21 });
23 const BUFFER_SIZE = 0x8000;
25 /**
26 * This helper function (and its companion object) are used by bulk senders and
27 * receivers to read and write data in and out of other streams. Functions that
28 * make use of this tool are passed to callers when it is time to read or write
29 * bulk data. It is highly recommended to use these copier functions instead of
30 * the stream directly because the copier enforces the agreed upon length.
31 * Since bulk mode reuses an existing stream, the sender and receiver must write
32 * and read exactly the agreed upon amount of data, or else the entire transport
33 * will be left in a invalid state. Additionally, other methods of stream
34 * copying (such as NetUtil.asyncCopy) close the streams involved, which would
35 * terminate the debugging transport, and so it is avoided here.
37 * Overall, this *works*, but clearly the optimal solution would be able to just
38 * use the streams directly. If it were possible to fully implement
39 * nsIInputStream / nsIOutputStream in JS, wrapper streams could be created to
40 * enforce the length and avoid closing, and consumers could use familiar stream
41 * utilities like NetUtil.asyncCopy.
43 * The function takes two async streams and copies a precise number of bytes
44 * from one to the other. Copying begins immediately, but may complete at some
45 * future time depending on data size. Use the returned promise to know when
46 * it's complete.
48 * @param input nsIAsyncInputStream
49 * The stream to copy from.
50 * @param output nsIAsyncOutputStream
51 * The stream to copy to.
52 * @param length Integer
53 * The amount of data that needs to be copied.
54 * @return Promise
55 * The promise is resolved when copying completes or rejected if any
56 * (unexpected) errors occur.
58 function copyStream(input, output, length) {
59 const copier = new StreamCopier(input, output, length);
60 return copier.copy();
63 function StreamCopier(input, output, length) {
64 EventEmitter.decorate(this);
65 this._id = StreamCopier._nextId++;
66 this.input = input;
67 // Save off the base output stream, since we know it's async as we've required
68 this.baseAsyncOutput = output;
69 if (IOUtil.outputStreamIsBuffered(output)) {
70 this.output = output;
71 } else {
72 this.output = Cc[
73 "@mozilla.org/network/buffered-output-stream;1"
74 ].createInstance(Ci.nsIBufferedOutputStream);
75 this.output.init(output, BUFFER_SIZE);
77 this._length = length;
78 this._amountLeft = length;
79 let _resolve;
80 let _reject;
81 this._deferred = new Promise((resolve, reject) => {
82 _resolve = resolve;
83 _reject = reject;
84 });
85 this._deferred.resolve = _resolve;
86 this._deferred.reject = _reject;
88 this._copy = this._copy.bind(this);
89 this._flush = this._flush.bind(this);
90 this._destroy = this._destroy.bind(this);
92 // Copy promise's then method up to this object.
93 // Allows the copier to offer a promise interface for the simple succeed or
94 // fail scenarios, but also emit events (due to the EventEmitter) for other
95 // states, like progress.
96 this.then = this._deferred.then.bind(this._deferred);
97 this.then(this._destroy, this._destroy);
99 // Stream ready callback starts as |_copy|, but may switch to |_flush| at end
100 // if flushing would block the output stream.
101 this._streamReadyCallback = this._copy;
103 StreamCopier._nextId = 0;
105 StreamCopier.prototype = {
106 copy() {
107 // Dispatch to the next tick so that it's possible to attach a progress
108 // event listener, even for extremely fast copies (like when testing).
109 Services.tm.dispatchToMainThread(() => {
110 try {
111 this._copy();
112 } catch (e) {
113 this._deferred.reject(e);
116 return this;
119 _copy() {
120 const bytesAvailable = this.input.available();
121 const amountToCopy = Math.min(bytesAvailable, this._amountLeft);
122 this._debug("Trying to copy: " + amountToCopy);
124 let bytesCopied;
125 try {
126 bytesCopied = this.output.writeFrom(this.input, amountToCopy);
127 } catch (e) {
128 if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK) {
129 this._debug("Base stream would block, will retry");
130 this._debug("Waiting for output stream");
131 this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
132 return;
134 throw e;
137 this._amountLeft -= bytesCopied;
138 this._debug("Copied: " + bytesCopied + ", Left: " + this._amountLeft);
139 this._emitProgress();
141 if (this._amountLeft === 0) {
142 this._debug("Copy done!");
143 this._flush();
144 return;
147 this._debug("Waiting for input stream");
148 this.input.asyncWait(this, 0, 0, Services.tm.currentThread);
151 _emitProgress() {
152 this.emit("progress", {
153 bytesSent: this._length - this._amountLeft,
154 totalBytes: this._length,
158 _flush() {
159 try {
160 this.output.flush();
161 } catch (e) {
162 if (
163 e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK ||
164 e.result == Cr.NS_ERROR_FAILURE
166 this._debug("Flush would block, will retry");
167 this._streamReadyCallback = this._flush;
168 this._debug("Waiting for output stream");
169 this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
170 return;
172 throw e;
174 this._deferred.resolve();
177 _destroy() {
178 this._destroy = null;
179 this._copy = null;
180 this._flush = null;
181 this.input = null;
182 this.output = null;
185 // nsIInputStreamCallback
186 onInputStreamReady() {
187 this._streamReadyCallback();
190 // nsIOutputStreamCallback
191 onOutputStreamReady() {
192 this._streamReadyCallback();
195 _debug(msg) {
196 // Prefix logs with the copier ID, which makes logs much easier to
197 // understand when several copiers are running simultaneously
198 dumpv("Copier: " + this._id + " " + msg);
203 * Read from a stream, one byte at a time, up to the next |delimiter|
204 * character, but stopping if we've read |count| without finding it. Reading
205 * also terminates early if there are less than |count| bytes available on the
206 * stream. In that case, we only read as many bytes as the stream currently has
207 * to offer.
208 * TODO: This implementation could be removed if bug 984651 is fixed, which
209 * provides a native version of the same idea.
210 * @param stream nsIInputStream
211 * The input stream to read from.
212 * @param delimiter string
213 * The character we're trying to find.
214 * @param count integer
215 * The max number of characters to read while searching.
216 * @return string
217 * The data collected. If the delimiter was found, this string will
218 * end with it.
220 function delimitedRead(stream, delimiter, count) {
221 dumpv(
222 "Starting delimited read for " + delimiter + " up to " + count + " bytes"
225 let scriptableStream;
226 if (stream instanceof Ci.nsIScriptableInputStream) {
227 scriptableStream = stream;
228 } else {
229 scriptableStream = new ScriptableInputStream(stream);
232 let data = "";
234 // Don't exceed what's available on the stream
235 count = Math.min(count, stream.available());
237 if (count <= 0) {
238 return data;
241 let char;
242 while (char !== delimiter && count > 0) {
243 char = scriptableStream.readBytes(1);
244 count--;
245 data += char;
248 return data;
251 module.exports = {
252 copyStream,
253 delimitedRead,