Cleanup - unused files / unused exports / duplicate exports
[ProtonMail-WebClient.git] / packages / crypto / lib / worker / transferHandlers / streamHandler.ts
blob09cf222fa1d4a2e760631d0b0e51507f50c764f3
1 type Data = Uint8Array | string;
2 type ChunkWithData<T> = { done: boolean; value?: T };
3 enum STREAM_CONTROL_TYPE {
4     'READ',
5     'CANCEL',
7 // Transfer a readable stream chunk by chunk using message channels
8 export const ReadableStreamSerializer = {
9     canHandle: (obj: any): obj is ReadableStream => typeof obj === 'object' && obj.getReader,
10     serialize: (readableStream: ReadableStream<Uint8Array>): MessagePort => {
11         const { port1, port2 } = new MessageChannel();
13         // wait to get the reader until the first chunk is requested
14         // in case the user wants to cancel the stream before starting reading it
15         let reader: ReadableStreamDefaultReader<Uint8Array> | null = null;
17         port1.onmessage = async ({ data: { type } }) => {
18             switch (type) {
19                 case STREAM_CONTROL_TYPE.READ:
20                     if (reader === null) {
21                         reader = readableStream.getReader();
22                     }
23                     const dataChunk = await reader.read();
24                     port1.postMessage(dataChunk, []); // no transferables
25                     break;
26                 case STREAM_CONTROL_TYPE.CANCEL:
27                     if (reader) {
28                         void reader.cancel();
29                     } else {
30                         void readableStream.cancel();
31                     }
32                     break;
33                 default:
34                     throw new Error('Unknown stream transfer control type');
35             }
36         };
38         // Transfer the message channel to the caller's execution context
39         return port2; // NB: the port is transferable and must be transferred
40     },
41     deserialize: <T extends Data>(port: MessagePort): ReadableStream<T> => {
42         // Convenience function to allow us to use async/await for messages coming down the port
43         const nextPortMessage = () =>
44             new Promise<ChunkWithData<T>>((resolve) => {
45                 port.onmessage = ({ data: chunk }: { data: ChunkWithData<T> }) => {
46                     resolve(chunk);
47                 };
48             });
50         // Minimal proxy reader
51         const portReader = {
52             read: () => {
53                 port.postMessage({ type: STREAM_CONTROL_TYPE.READ });
54                 // promise that will resolve with the chunk returned by the remote reader
55                 return nextPortMessage();
56             },
58             cancel: () => {
59                 port.postMessage({ type: STREAM_CONTROL_TYPE.CANCEL });
60             },
61         };
63         const reconstructedStream = new ReadableStream<T>({
64             async pull(controller) {
65                 const { done, value } = await portReader.read();
66                 // When no more data needs to be consumed, close the stream
67                 if (done) {
68                     controller.close();
69                     return;
70                 }
71                 // Enqueue the next data chunk into our target stream
72                 controller.enqueue(value);
73             },
74             cancel() {
75                 portReader.cancel();
76             },
77         });
79         // // TODO? (not needed for now): make it iterable so it can be used in for-await-of statement
80         // reconstructedStream[Symbol.asyncIterator] = () => portReader;
82         return reconstructedStream;
83     },
86 export type SerializeWebStreamTypes<T> = {
87     [I in keyof T]: T[I] extends ReadableStream<Data> | undefined ? MessagePort : T[I];