1 type Data = Uint8Array | string;
2 type ChunkWithData<T> = { done: boolean; value?: T };
3 enum STREAM_CONTROL_TYPE {
7 // Transfer a readable stream chunk by chunk using message channels
8 export const ReadableStreamSerializer = {
9 canHandle: (obj: any): obj is ReadableStream => typeof obj === 'object' && obj.getReader,
10 serialize: (readableStream: ReadableStream<Uint8Array>): MessagePort => {
11 const { port1, port2 } = new MessageChannel();
13 // wait to get the reader until the first chunk is requested
14 // in case the user wants to cancel the stream before starting reading it
15 let reader: ReadableStreamDefaultReader<Uint8Array> | null = null;
17 port1.onmessage = async ({ data: { type } }) => {
19 case STREAM_CONTROL_TYPE.READ:
20 if (reader === null) {
21 reader = readableStream.getReader();
23 const dataChunk = await reader.read();
24 port1.postMessage(dataChunk, []); // no transferables
26 case STREAM_CONTROL_TYPE.CANCEL:
30 void readableStream.cancel();
34 throw new Error('Unknown stream transfer control type');
38 // Transfer the message channel to the caller's execution context
39 return port2; // NB: the port is transferable and must be transferred
41 deserialize: <T extends Data>(port: MessagePort): ReadableStream<T> => {
42 // Convenience function to allow us to use async/await for messages coming down the port
43 const nextPortMessage = () =>
44 new Promise<ChunkWithData<T>>((resolve) => {
45 port.onmessage = ({ data: chunk }: { data: ChunkWithData<T> }) => {
50 // Minimal proxy reader
53 port.postMessage({ type: STREAM_CONTROL_TYPE.READ });
54 // promise that will resolve with the chunk returned by the remote reader
55 return nextPortMessage();
59 port.postMessage({ type: STREAM_CONTROL_TYPE.CANCEL });
63 const reconstructedStream = new ReadableStream<T>({
64 async pull(controller) {
65 const { done, value } = await portReader.read();
66 // When no more data needs to be consumed, close the stream
71 // Enqueue the next data chunk into our target stream
72 controller.enqueue(value);
79 // // TODO? (not needed for now): make it iterable so it can be used in for-await-of statement
80 // reconstructedStream[Symbol.asyncIterator] = () => portReader;
82 return reconstructedStream;
86 export type SerializeWebStreamTypes<T> = {
87 [I in keyof T]: T[I] extends ReadableStream<Data> | undefined ? MessagePort : T[I];