1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs";
9 ChromeUtils.defineESModuleGetters(lazy, {
10 EventEmitter: "resource://gre/modules/EventEmitter.sys.mjs",
13 XPCOMUtils.defineLazyServiceGetter(
16 "@mozilla.org/io-util;1",
20 ChromeUtils.defineLazyGetter(lazy, "ScriptableInputStream", () => {
21 return Components.Constructor(
22 "@mozilla.org/scriptableinputstream;1",
23 "nsIScriptableInputStream",
28 const BUFFER_SIZE = 0x8000;
31 * This helper function (and its companion object) are used by bulk
32 * senders and receivers to read and write data in and out of other streams.
33 * Functions that make use of this tool are passed to callers when it is
34 * time to read or write bulk data. It is highly recommended to use these
35 * copier functions instead of the stream directly because the copier
36 * enforces the agreed upon length. Since bulk mode reuses an existing
37 * stream, the sender and receiver must write and read exactly the agreed
38 * upon amount of data, or else the entire transport will be left in a
39 * invalid state. Additionally, other methods of stream copying (such as
40 * NetUtil.asyncCopy) close the streams involved, which would terminate
41 * the debugging transport, and so it is avoided here.
43 * Overall, this *works*, but clearly the optimal solution would be
44 * able to just use the streams directly. If it were possible to fully
45 * implement nsIInputStream/nsIOutputStream in JS, wrapper streams could
46 * be created to enforce the length and avoid closing, and consumers could
47 * use familiar stream utilities like NetUtil.asyncCopy.
49 * The function takes two async streams and copies a precise number
50 * of bytes from one to the other. Copying begins immediately, but may
51 * complete at some future time depending on data size. Use the returned
52 * promise to know when it's complete.
54 * @param {nsIAsyncInputStream} input
55 * Stream to copy from.
56 * @param {nsIAsyncOutputStream} output
58 * @param {number} length
59 * Amount of data that needs to be copied.
62 * Promise is resolved when copying completes or rejected if any
63 * (unexpected) errors occur.
65 function copyStream(input, output, length) {
66 let copier = new StreamCopier(input, output, length);
71 function StreamCopier(input, output, length) {
72 lazy.EventEmitter.decorate(this);
73 this._id = StreamCopier._nextId++;
75 // Save off the base output stream, since we know it's async as we've
77 this.baseAsyncOutput = output;
78 if (lazy.IOUtil.outputStreamIsBuffered(output)) {
82 "@mozilla.org/network/buffered-output-stream;1"
83 ].createInstance(Ci.nsIBufferedOutputStream);
84 this.output.init(output, BUFFER_SIZE);
86 this._length = length;
87 this._amountLeft = length;
89 promise: new Promise((resolve, reject) => {
90 this._deferred.resolve = resolve;
91 this._deferred.reject = reject;
95 this._copy = this._copy.bind(this);
96 this._flush = this._flush.bind(this);
97 this._destroy = this._destroy.bind(this);
99 // Copy promise's then method up to this object.
101 // Allows the copier to offer a promise interface for the simple succeed
102 // or fail scenarios, but also emit events (due to the EventEmitter)
103 // for other states, like progress.
104 this.then = this._deferred.promise.then.bind(this._deferred.promise);
105 this.then(this._destroy, this._destroy);
107 // Stream ready callback starts as |_copy|, but may switch to |_flush|
108 // at end if flushing would block the output stream.
109 this._streamReadyCallback = this._copy;
111 StreamCopier._nextId = 0;
113 StreamCopier.prototype = {
115 // Dispatch to the next tick so that it's possible to attach a progress
116 // event listener, even for extremely fast copies (like when testing).
117 Services.tm.currentThread.dispatch(() => {
121 this._deferred.reject(e);
128 let bytesAvailable = this.input.available();
129 let amountToCopy = Math.min(bytesAvailable, this._amountLeft);
130 this._debug("Trying to copy: " + amountToCopy);
134 bytesCopied = this.output.writeFrom(this.input, amountToCopy);
136 if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK) {
137 this._debug("Base stream would block, will retry");
138 this._debug("Waiting for output stream");
139 this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
145 this._amountLeft -= bytesCopied;
146 this._debug("Copied: " + bytesCopied + ", Left: " + this._amountLeft);
147 this._emitProgress();
149 if (this._amountLeft === 0) {
150 this._debug("Copy done!");
155 this._debug("Waiting for input stream");
156 this.input.asyncWait(this, 0, 0, Services.tm.currentThread);
160 this.emit("progress", {
161 bytesSent: this._length - this._amountLeft,
162 totalBytes: this._length,
171 e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK ||
172 e.result == Cr.NS_ERROR_FAILURE
174 this._debug("Flush would block, will retry");
175 this._streamReadyCallback = this._flush;
176 this._debug("Waiting for output stream");
177 this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
182 this._deferred.resolve();
186 this._destroy = null;
193 // nsIInputStreamCallback
194 onInputStreamReady() {
195 this._streamReadyCallback();
198 // nsIOutputStreamCallback
199 onOutputStreamReady() {
200 this._streamReadyCallback();
207 * Read from a stream, one byte at a time, up to the next
208 * <var>delimiter</var> character, but stopping if we've read |count|
209 * without finding it. Reading also terminates early if there are less
210 * than <var>count</var> bytes available on the stream. In that case,
211 * we only read as many bytes as the stream currently has to offer.
213 * @param {nsIInputStream} stream
214 * Input stream to read from.
215 * @param {string} delimiter
216 * Character we're trying to find.
217 * @param {number} count
218 * Max number of characters to read while searching.
221 * Collected data. If the delimiter was found, this string will
224 // TODO: This implementation could be removed if bug 984651 is fixed,
225 // which provides a native version of the same idea.
226 function delimitedRead(stream, delimiter, count) {
227 let scriptableStream;
228 if (stream instanceof Ci.nsIScriptableInputStream) {
229 scriptableStream = stream;
231 scriptableStream = new lazy.ScriptableInputStream(stream);
236 // Don't exceed what's available on the stream
237 count = Math.min(count, stream.available());
244 while (char !== delimiter && count > 0) {
245 char = scriptableStream.readBytes(1);
253 export const StreamUtils = {