Backed out changeset b71c8c052463 (bug 1943846) for causing mass failures. CLOSED...
[gecko.git] / devtools / shared / transport / transport.js
blob460f45206f1f88d3a6e7da574202aacd33b1e783
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 "use strict";
7 const DevToolsUtils = require("resource://devtools/shared/DevToolsUtils.js");
8 const { dumpn, dumpv } = DevToolsUtils;
9 const flags = require("resource://devtools/shared/flags.js");
10 const StreamUtils = require("resource://devtools/shared/transport/stream-utils.js");
11 const {
12 Packet,
13 JSONPacket,
14 BulkPacket,
15 } = require("resource://devtools/shared/transport/packets.js");
17 loader.lazyGetter(this, "ScriptableInputStream", () => {
18 return Components.Constructor(
19 "@mozilla.org/scriptableinputstream;1",
20 "nsIScriptableInputStream",
21 "init"
23 });
25 const PACKET_HEADER_MAX = 200;
27 /**
28 * An adapter that handles data transfers between the devtools client and
29 * server. It can work with both nsIPipe and nsIServerSocket transports so
30 * long as the properly created input and output streams are specified.
31 * (However, for intra-process connections, LocalDebuggerTransport, below,
32 * is more efficient than using an nsIPipe pair with DebuggerTransport.)
34 * @param input nsIAsyncInputStream
35 * The input stream.
36 * @param output nsIAsyncOutputStream
37 * The output stream.
39 * Given a DebuggerTransport instance dt:
40 * 1) Set dt.hooks to a packet handler object (described below).
41 * 2) Call dt.ready() to begin watching for input packets.
42 * 3) Call dt.send() / dt.startBulkSend() to send packets.
43 * 4) Call dt.close() to close the connection, and disengage from the event
44 * loop.
46 * A packet handler is an object with the following methods:
48 * - onPacket(packet) - called when we have received a complete packet.
49 * |packet| is the parsed form of the packet --- a JavaScript value, not
50 * a JSON-syntax string.
52 * - onBulkPacket(packet) - called when we have switched to bulk packet
53 * receiving mode. |packet| is an object containing:
54 * * actor: Name of actor that will receive the packet
55 * * type: Name of actor's method that should be called on receipt
56 * * length: Size of the data to be read
57 * * stream: This input stream should only be used directly if you can ensure
58 * that you will read exactly |length| bytes and will not close the
59 * stream when reading is complete
60 * * done: If you use the stream directly (instead of |copyTo| below), you
61 * must signal completion by resolving / rejecting this deferred.
62 * If it's rejected, the transport will be closed. If an Error is
63 * supplied as a rejection value, it will be logged via |dumpn|.
64 * If you do use |copyTo|, resolving is taken care of for you when
65 * copying completes.
66 * * copyTo: A helper function for getting your data out of the stream that
67 * meets the stream handling requirements above, and has the
68 * following signature:
69 * @param output nsIAsyncOutputStream
70 * The stream to copy to.
71 * @return Promise
72 * The promise is resolved when copying completes or rejected if any
73 * (unexpected) errors occur.
74 * This object also emits "progress" events for each chunk that is
75 * copied. See stream-utils.js.
77 * - onTransportClosed(reason) - called when the connection is closed. |reason| is
78 * an optional nsresult or object, typically passed when the transport is
79 * closed due to some error in a underlying stream.
81 * See ./packets.js and the Remote Debugging Protocol specification for more
82 * details on the format of these packets.
84 function DebuggerTransport(input, output) {
85 this._input = input;
86 this._scriptableInput = new ScriptableInputStream(input);
87 this._output = output;
89 // The current incoming (possibly partial) header, which will determine which
90 // type of Packet |_incoming| below will become.
91 this._incomingHeader = "";
92 // The current incoming Packet object
93 this._incoming = null;
94 // A queue of outgoing Packet objects
95 this._outgoing = [];
97 this.hooks = null;
98 this.active = false;
100 this._incomingEnabled = true;
101 this._outgoingEnabled = true;
103 this.close = this.close.bind(this);
106 DebuggerTransport.prototype = {
108 * Transmit an object as a JSON packet.
110 * This method returns immediately, without waiting for the entire
111 * packet to be transmitted, registering event handlers as needed to
112 * transmit the entire packet. Packets are transmitted in the order
113 * they are passed to this method.
115 send(object) {
116 const packet = new JSONPacket(this);
117 packet.object = object;
118 this._outgoing.push(packet);
119 this._flushOutgoing();
123 * Transmit streaming data via a bulk packet.
125 * This method initiates the bulk send process by queuing up the header data.
126 * The caller receives eventual access to a stream for writing.
128 * N.B.: Do *not* attempt to close the stream handed to you, as it will
129 * continue to be used by this transport afterwards. Most users should
130 * instead use the provided |copyFrom| function instead.
132 * @param header Object
133 * This is modeled after the format of JSON packets above, but does not
134 * actually contain the data, but is instead just a routing header:
135 * * actor: Name of actor that will receive the packet
136 * * type: Name of actor's method that should be called on receipt
137 * * length: Size of the data to be sent
138 * @return Promise
139 * The promise will be resolved when you are allowed to write to the
140 * stream with an object containing:
141 * * stream: This output stream should only be used directly if
142 * you can ensure that you will write exactly |length|
143 * bytes and will not close the stream when writing is
144 * complete
145 * * done: If you use the stream directly (instead of |copyFrom|
146 * below), you must signal completion by resolving /
147 * rejecting this deferred. If it's rejected, the
148 * transport will be closed. If an Error is supplied as
149 * a rejection value, it will be logged via |dumpn|. If
150 * you do use |copyFrom|, resolving is taken care of for
151 * you when copying completes.
152 * * copyFrom: A helper function for getting your data onto the
153 * stream that meets the stream handling requirements
154 * above, and has the following signature:
155 * @param input nsIAsyncInputStream
156 * The stream to copy from.
157 * @return Promise
158 * The promise is resolved when copying completes or
159 * rejected if any (unexpected) errors occur.
160 * This object also emits "progress" events for each chunk
161 * that is copied. See stream-utils.js.
163 startBulkSend(header) {
164 const packet = new BulkPacket(this);
165 packet.header = header;
166 this._outgoing.push(packet);
167 this._flushOutgoing();
168 return packet.streamReadyForWriting;
172 * Close the transport.
173 * @param reason nsresult / object (optional)
174 * The status code or error message that corresponds to the reason for
175 * closing the transport (likely because a stream closed or failed).
177 close(reason) {
178 this.active = false;
179 this._input.close();
180 this._scriptableInput.close();
181 this._output.close();
182 this._destroyIncoming();
183 this._destroyAllOutgoing();
184 if (this.hooks) {
185 this.hooks.onTransportClosed(reason);
186 this.hooks = null;
188 if (reason) {
189 dumpn("Transport closed: " + DevToolsUtils.safeErrorString(reason));
190 } else {
191 dumpn("Transport closed.");
196 * The currently outgoing packet (at the top of the queue).
198 get _currentOutgoing() {
199 return this._outgoing[0];
203 * Flush data to the outgoing stream. Waits until the output stream notifies
204 * us that it is ready to be written to (via onOutputStreamReady).
206 _flushOutgoing() {
207 if (!this._outgoingEnabled || this._outgoing.length === 0) {
208 return;
211 // If the top of the packet queue has nothing more to send, remove it.
212 if (this._currentOutgoing.done) {
213 this._finishCurrentOutgoing();
216 if (this._outgoing.length) {
217 const threadManager = Cc["@mozilla.org/thread-manager;1"].getService();
218 this._output.asyncWait(this, 0, 0, threadManager.currentThread);
223 * Pause this transport's attempts to write to the output stream. This is
224 * used when we've temporarily handed off our output stream for writing bulk
225 * data.
227 pauseOutgoing() {
228 this._outgoingEnabled = false;
232 * Resume this transport's attempts to write to the output stream.
234 resumeOutgoing() {
235 this._outgoingEnabled = true;
236 this._flushOutgoing();
239 // nsIOutputStreamCallback
241 * This is called when the output stream is ready for more data to be written.
242 * The current outgoing packet will attempt to write some amount of data, but
243 * may not complete.
245 onOutputStreamReady: DevToolsUtils.makeInfallible(function (stream) {
246 if (!this._outgoingEnabled || this._outgoing.length === 0) {
247 return;
250 try {
251 this._currentOutgoing.write(stream);
252 } catch (e) {
253 if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) {
254 this.close(e.result);
255 return;
257 throw e;
260 this._flushOutgoing();
261 }, "DebuggerTransport.prototype.onOutputStreamReady"),
264 * Remove the current outgoing packet from the queue upon completion.
266 _finishCurrentOutgoing() {
267 if (this._currentOutgoing) {
268 this._currentOutgoing.destroy();
269 this._outgoing.shift();
274 * Clear the entire outgoing queue.
276 _destroyAllOutgoing() {
277 for (const packet of this._outgoing) {
278 packet.destroy();
280 this._outgoing = [];
284 * Initialize the input stream for reading. Once this method has been called,
285 * we watch for packets on the input stream, and pass them to the appropriate
286 * handlers via this.hooks.
288 ready() {
289 this.active = true;
290 this._waitForIncoming();
294 * Asks the input stream to notify us (via onInputStreamReady) when it is
295 * ready for reading.
297 _waitForIncoming() {
298 if (this._incomingEnabled) {
299 const threadManager = Cc["@mozilla.org/thread-manager;1"].getService();
300 this._input.asyncWait(this, 0, 0, threadManager.currentThread);
305 * Pause this transport's attempts to read from the input stream. This is
306 * used when we've temporarily handed off our input stream for reading bulk
307 * data.
309 pauseIncoming() {
310 this._incomingEnabled = false;
314 * Resume this transport's attempts to read from the input stream.
316 resumeIncoming() {
317 this._incomingEnabled = true;
318 this._flushIncoming();
319 this._waitForIncoming();
322 // nsIInputStreamCallback
324 * Called when the stream is either readable or closed.
326 onInputStreamReady: DevToolsUtils.makeInfallible(function (stream) {
327 try {
328 while (
329 stream.available() &&
330 this._incomingEnabled &&
331 this._processIncoming(stream, stream.available())
333 // Loop until there is nothing more to process
335 this._waitForIncoming();
336 } catch (e) {
337 if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) {
338 this.close(e.result);
339 } else {
340 throw e;
343 }, "DebuggerTransport.prototype.onInputStreamReady"),
346 * Process the incoming data. Will create a new currently incoming Packet if
347 * needed. Tells the incoming Packet to read as much data as it can, but
348 * reading may not complete. The Packet signals that its data is ready for
349 * delivery by calling one of this transport's _on*Ready methods (see
350 * ./packets.js and the _on*Ready methods below).
351 * @return boolean
352 * Whether incoming stream processing should continue for any
353 * remaining data.
355 _processIncoming(stream, count) {
356 dumpv("Data available: " + count);
358 if (!count) {
359 dumpv("Nothing to read, skipping");
360 return false;
363 try {
364 if (!this._incoming) {
365 dumpv("Creating a new packet from incoming");
367 if (!this._readHeader(stream)) {
368 // Not enough data to read packet type
369 return false;
372 // Attempt to create a new Packet by trying to parse each possible
373 // header pattern.
374 this._incoming = Packet.fromHeader(this._incomingHeader, this);
375 if (!this._incoming) {
376 throw new Error(
377 "No packet types for header: " + this._incomingHeader
382 if (!this._incoming.done) {
383 // We have an incomplete packet, keep reading it.
384 dumpv("Existing packet incomplete, keep reading");
385 this._incoming.read(stream, this._scriptableInput);
387 } catch (e) {
388 const msg =
389 "Error reading incoming packet: (" + e + " - " + e.stack + ")";
390 dumpn(msg);
392 // Now in an invalid state, shut down the transport.
393 this.close();
394 return false;
397 if (!this._incoming.done) {
398 // Still not complete, we'll wait for more data.
399 dumpv("Packet not done, wait for more");
400 return true;
403 // Ready for next packet
404 this._flushIncoming();
405 return true;
409 * Read as far as we can into the incoming data, attempting to build up a
410 * complete packet header (which terminates with ":"). We'll only read up to
411 * PACKET_HEADER_MAX characters.
412 * @return boolean
413 * True if we now have a complete header.
415 _readHeader() {
416 const amountToRead = PACKET_HEADER_MAX - this._incomingHeader.length;
417 this._incomingHeader += StreamUtils.delimitedRead(
418 this._scriptableInput,
419 ":",
420 amountToRead
422 if (flags.wantVerbose) {
423 dumpv("Header read: " + this._incomingHeader);
426 if (this._incomingHeader.endsWith(":")) {
427 if (flags.wantVerbose) {
428 dumpv("Found packet header successfully: " + this._incomingHeader);
430 return true;
433 if (this._incomingHeader.length >= PACKET_HEADER_MAX) {
434 throw new Error("Failed to parse packet header!");
437 // Not enough data yet.
438 return false;
442 * If the incoming packet is done, log it as needed and clear the buffer.
444 _flushIncoming() {
445 if (!this._incoming.done) {
446 return;
448 if (flags.wantLogging) {
449 dumpn("Got: " + this._incoming);
451 this._destroyIncoming();
455 * Handler triggered by an incoming JSONPacket completing it's |read| method.
456 * Delivers the packet to this.hooks.onPacket.
458 _onJSONObjectReady(object) {
459 DevToolsUtils.executeSoon(
460 DevToolsUtils.makeInfallible(() => {
461 // Ensure the transport is still alive by the time this runs.
462 if (this.active) {
463 this.hooks.onPacket(object);
465 }, "DebuggerTransport instance's this.hooks.onPacket")
470 * Handler triggered by an incoming BulkPacket entering the |read| phase for
471 * the stream portion of the packet. Delivers info about the incoming
472 * streaming data to this.hooks.onBulkPacket. See the main comment on the
473 * transport at the top of this file for more details.
475 _onBulkReadReady(...args) {
476 DevToolsUtils.executeSoon(
477 DevToolsUtils.makeInfallible(() => {
478 // Ensure the transport is still alive by the time this runs.
479 if (this.active) {
480 this.hooks.onBulkPacket(...args);
482 }, "DebuggerTransport instance's this.hooks.onBulkPacket")
487 * Remove all handlers and references related to the current incoming packet,
488 * either because it is now complete or because the transport is closing.
490 _destroyIncoming() {
491 if (this._incoming) {
492 this._incoming.destroy();
494 this._incomingHeader = "";
495 this._incoming = null;
499 exports.DebuggerTransport = DebuggerTransport;