Backed out changeset b71c8c052463 (bug 1943846) for causing mass failures. CLOSED...
[gecko.git] / remote / marionette / stream-utils.sys.mjs
blob59792806600025409eaab4830128c7d334ec56db
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2  * License, v. 2.0. If a copy of the MPL was not distributed with this
3  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs";
7 const lazy = {};
9 ChromeUtils.defineESModuleGetters(lazy, {
10   EventEmitter: "resource://gre/modules/EventEmitter.sys.mjs",
11 });
13 XPCOMUtils.defineLazyServiceGetter(
14   lazy,
15   "IOUtil",
16   "@mozilla.org/io-util;1",
17   "nsIIOUtil"
20 ChromeUtils.defineLazyGetter(lazy, "ScriptableInputStream", () => {
21   return Components.Constructor(
22     "@mozilla.org/scriptableinputstream;1",
23     "nsIScriptableInputStream",
24     "init"
25   );
26 });
28 const BUFFER_SIZE = 0x8000;
30 /**
31  * This helper function (and its companion object) are used by bulk
32  * senders and receivers to read and write data in and out of other streams.
33  * Functions that make use of this tool are passed to callers when it is
34  * time to read or write bulk data.  It is highly recommended to use these
35  * copier functions instead of the stream directly because the copier
36  * enforces the agreed upon length. Since bulk mode reuses an existing
37  * stream, the sender and receiver must write and read exactly the agreed
38  * upon amount of data, or else the entire transport will be left in a
39  * invalid state.  Additionally, other methods of stream copying (such as
40  * NetUtil.asyncCopy) close the streams involved, which would terminate
41  * the debugging transport, and so it is avoided here.
42  *
43  * Overall, this *works*, but clearly the optimal solution would be
44  * able to just use the streams directly.  If it were possible to fully
45  * implement nsIInputStream/nsIOutputStream in JS, wrapper streams could
46  * be created to enforce the length and avoid closing, and consumers could
47  * use familiar stream utilities like NetUtil.asyncCopy.
48  *
49  * The function takes two async streams and copies a precise number
50  * of bytes from one to the other.  Copying begins immediately, but may
51  * complete at some future time depending on data size.  Use the returned
52  * promise to know when it's complete.
53  *
54  * @param {nsIAsyncInputStream} input
55  *     Stream to copy from.
56  * @param {nsIAsyncOutputStream} output
57  *        Stream to copy to.
58  * @param {number} length
59  *        Amount of data that needs to be copied.
60  *
61  * @returns {Promise}
62  *     Promise is resolved when copying completes or rejected if any
63  *     (unexpected) errors occur.
64  */
65 function copyStream(input, output, length) {
66   let copier = new StreamCopier(input, output, length);
67   return copier.copy();
70 /** @class */
71 function StreamCopier(input, output, length) {
72   lazy.EventEmitter.decorate(this);
73   this._id = StreamCopier._nextId++;
74   this.input = input;
75   // Save off the base output stream, since we know it's async as we've
76   // required
77   this.baseAsyncOutput = output;
78   if (lazy.IOUtil.outputStreamIsBuffered(output)) {
79     this.output = output;
80   } else {
81     this.output = Cc[
82       "@mozilla.org/network/buffered-output-stream;1"
83     ].createInstance(Ci.nsIBufferedOutputStream);
84     this.output.init(output, BUFFER_SIZE);
85   }
86   this._length = length;
87   this._amountLeft = length;
88   this._deferred = {
89     promise: new Promise((resolve, reject) => {
90       this._deferred.resolve = resolve;
91       this._deferred.reject = reject;
92     }),
93   };
95   this._copy = this._copy.bind(this);
96   this._flush = this._flush.bind(this);
97   this._destroy = this._destroy.bind(this);
99   // Copy promise's then method up to this object.
100   //
101   // Allows the copier to offer a promise interface for the simple succeed
102   // or fail scenarios, but also emit events (due to the EventEmitter)
103   // for other states, like progress.
104   this.then = this._deferred.promise.then.bind(this._deferred.promise);
105   this.then(this._destroy, this._destroy);
107   // Stream ready callback starts as |_copy|, but may switch to |_flush|
108   // at end if flushing would block the output stream.
109   this._streamReadyCallback = this._copy;
111 StreamCopier._nextId = 0;
113 StreamCopier.prototype = {
114   copy() {
115     // Dispatch to the next tick so that it's possible to attach a progress
116     // event listener, even for extremely fast copies (like when testing).
117     Services.tm.currentThread.dispatch(() => {
118       try {
119         this._copy();
120       } catch (e) {
121         this._deferred.reject(e);
122       }
123     }, 0);
124     return this;
125   },
127   _copy() {
128     let bytesAvailable = this.input.available();
129     let amountToCopy = Math.min(bytesAvailable, this._amountLeft);
130     this._debug("Trying to copy: " + amountToCopy);
132     let bytesCopied;
133     try {
134       bytesCopied = this.output.writeFrom(this.input, amountToCopy);
135     } catch (e) {
136       if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK) {
137         this._debug("Base stream would block, will retry");
138         this._debug("Waiting for output stream");
139         this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
140         return;
141       }
142       throw e;
143     }
145     this._amountLeft -= bytesCopied;
146     this._debug("Copied: " + bytesCopied + ", Left: " + this._amountLeft);
147     this._emitProgress();
149     if (this._amountLeft === 0) {
150       this._debug("Copy done!");
151       this._flush();
152       return;
153     }
155     this._debug("Waiting for input stream");
156     this.input.asyncWait(this, 0, 0, Services.tm.currentThread);
157   },
159   _emitProgress() {
160     this.emit("progress", {
161       bytesSent: this._length - this._amountLeft,
162       totalBytes: this._length,
163     });
164   },
166   _flush() {
167     try {
168       this.output.flush();
169     } catch (e) {
170       if (
171         e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK ||
172         e.result == Cr.NS_ERROR_FAILURE
173       ) {
174         this._debug("Flush would block, will retry");
175         this._streamReadyCallback = this._flush;
176         this._debug("Waiting for output stream");
177         this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
178         return;
179       }
180       throw e;
181     }
182     this._deferred.resolve();
183   },
185   _destroy() {
186     this._destroy = null;
187     this._copy = null;
188     this._flush = null;
189     this.input = null;
190     this.output = null;
191   },
193   // nsIInputStreamCallback
194   onInputStreamReady() {
195     this._streamReadyCallback();
196   },
198   // nsIOutputStreamCallback
199   onOutputStreamReady() {
200     this._streamReadyCallback();
201   },
203   _debug() {},
207  * Read from a stream, one byte at a time, up to the next
208  * <var>delimiter</var> character, but stopping if we've read |count|
209  * without finding it.  Reading also terminates early if there are less
210  * than <var>count</var> bytes available on the stream.  In that case,
211  * we only read as many bytes as the stream currently has to offer.
213  * @param {nsIInputStream} stream
214  *     Input stream to read from.
215  * @param {string} delimiter
216  *     Character we're trying to find.
217  * @param {number} count
218  *     Max number of characters to read while searching.
220  * @returns {string}
221  *     Collected data.  If the delimiter was found, this string will
222  *     end with it.
223  */
224 // TODO: This implementation could be removed if bug 984651 is fixed,
225 // which provides a native version of the same idea.
226 function delimitedRead(stream, delimiter, count) {
227   let scriptableStream;
228   if (stream instanceof Ci.nsIScriptableInputStream) {
229     scriptableStream = stream;
230   } else {
231     scriptableStream = new lazy.ScriptableInputStream(stream);
232   }
234   let data = "";
236   // Don't exceed what's available on the stream
237   count = Math.min(count, stream.available());
239   if (count <= 0) {
240     return data;
241   }
243   let char;
244   while (char !== delimiter && count > 0) {
245     char = scriptableStream.readBytes(1);
246     count--;
247     data += char;
248   }
250   return data;
253 export const StreamUtils = {
254   copyStream,
255   delimitedRead,