Merge branch 'fix/sentry-issue' into 'main'
[ProtonMail-WebClient.git] / packages / shared / lib / calendar / import / encryptAndSubmit.ts
blobceb950968c7bf94a05115d65b1162cbeaafe4da8
1 import { getHasSharedEventContent, getHasSharedKeyPacket } from '@proton/shared/lib/calendar/apiModels';
2 import chunk from '@proton/utils/chunk';
4 import { syncMultipleEvents } from '../../api/calendars';
5 import { HOUR, SECOND } from '../../constants';
6 import { HTTP_ERROR_CODES } from '../../errors';
7 import { wait } from '../../helpers/promise';
8 import type { Api, DecryptedKey } from '../../interfaces';
9 import type {
10     DecryptedCalendarKey,
11     EncryptedEvent,
12     ImportedEvent,
13     SyncMultipleApiResponse,
14     SyncMultipleApiResponses,
15     VcalVeventComponent,
16 } from '../../interfaces/calendar';
17 import type { CreateCalendarEventSyncData } from '../../interfaces/calendar/Api';
18 import { DEFAULT_ATTENDEE_PERMISSIONS } from '../constants';
19 import { getCreationKeys } from '../crypto/keys/helpers';
20 import { getIsSuccessSyncApiResponse } from '../helper';
21 import { IMPORT_EVENT_ERROR_TYPE, ImportEventError } from '../icsSurgery/ImportEventError';
22 import { createCalendarEvent } from '../serialize';
23 import { prodId } from '../vcalConfig';
24 import { getComponentIdentifier, splitErrors } from './import';
26 const BATCH_SIZE = 10;
28 const encryptEvent = async ({
29     eventComponent,
30     addressKeys,
31     calendarKeys,
32     hasDefaultNotifications,
33 }: {
34     eventComponent: VcalVeventComponent;
35     addressKeys: DecryptedKey[];
36     calendarKeys: DecryptedCalendarKey[];
37     hasDefaultNotifications: boolean;
38 }) => {
39     const componentId = getComponentIdentifier(eventComponent);
40     try {
41         const data = await createCalendarEvent({
42             eventComponent,
43             isCreateEvent: true,
44             isSwitchCalendar: false,
45             hasDefaultNotifications,
46             ...(await getCreationKeys({ newAddressKeys: addressKeys, newCalendarKeys: calendarKeys })),
47         });
48         if (!getHasSharedKeyPacket(data) || !getHasSharedEventContent(data)) {
49             throw new Error('Missing shared data');
50         }
51         return { data, component: eventComponent };
52     } catch (error: any) {
53         return new ImportEventError({
54             errorType: IMPORT_EVENT_ERROR_TYPE.ENCRYPTION_ERROR,
55             componentIdentifiers: { component: 'vevent', componentId, prodId, domain: 'proton.me' },
56         });
57     }
60 const submitEvents = async (
61     events: EncryptedEvent[],
62     calendarID: string,
63     memberID: string,
64     api: Api,
65     overwrite?: boolean,
66     withJails?: boolean
67 ): Promise<SyncMultipleApiResponses[]> => {
68     try {
69         const Events = events.map(
70             ({ data }): CreateCalendarEventSyncData => ({
71                 Overwrite: overwrite ? 1 : 0,
72                 Event: { Permissions: DEFAULT_ATTENDEE_PERMISSIONS, ...data },
73             })
74         );
75         const { Responses } = await api<SyncMultipleApiResponse>({
76             ...syncMultipleEvents(calendarID, { MemberID: memberID, IsImport: 1, Events }),
77             timeout: HOUR,
78             silence: true,
79             ignoreHandler: withJails ? [HTTP_ERROR_CODES.TOO_MANY_REQUESTS] : undefined,
80         });
81         return Responses;
82     } catch (error: any) {
83         if (withJails && error?.status === HTTP_ERROR_CODES.TOO_MANY_REQUESTS) {
84             throw error;
85         }
86         return events.map((event, index) => ({
87             Index: index,
88             Response: { Code: 0, Error: `${error}` },
89         }));
90     }
93 const processResponses = (responses: SyncMultipleApiResponses[], events: EncryptedEvent[]) => {
94     return responses.map((response): ImportedEvent | ImportEventError => {
95         const {
96             Index,
97             Response: { Error: errorMessage },
98         } = response;
99         if (getIsSuccessSyncApiResponse(response)) {
100             return {
101                 ...events[Index],
102                 response,
103             };
104         }
105         const error = new Error(errorMessage);
106         const component = events[Index]?.component;
107         const componentId = component ? getComponentIdentifier(component) : '';
108         return new ImportEventError({
109             errorType: IMPORT_EVENT_ERROR_TYPE.EXTERNAL_ERROR,
110             componentIdentifiers: { component: 'vevent', componentId, prodId, domain: 'proton.me' },
111             externalError: error,
112         });
113     });
116 interface ProcessData {
117     events: { eventComponent: VcalVeventComponent; hasDefaultNotifications: boolean }[];
118     calendarID: string;
119     memberID: string;
120     addressKeys: DecryptedKey[];
121     calendarKeys: DecryptedCalendarKey[];
122     api: Api;
123     overwrite?: boolean;
124     signal?: AbortSignal;
125     onProgress?: (encrypted: EncryptedEvent[], imported: EncryptedEvent[], errors: ImportEventError[]) => void;
128 export const processInBatches = async ({
129     events,
130     calendarID,
131     memberID,
132     addressKeys,
133     calendarKeys,
134     api,
135     overwrite = true,
136     signal,
137     onProgress,
138 }: ProcessData) => {
139     const batches = chunk(events, BATCH_SIZE);
140     const promises = [];
141     const imported: ImportedEvent[][] = [];
142     const errored: ImportEventError[][] = [];
144     for (let i = 0; i < batches.length; i++) {
145         // The API requests limit for the submit route is 40 calls per 10 seconds
146         // We play it safe by enforcing a 300ms minimum wait between API calls. During this wait we encrypt the events
147         if (signal?.aborted) {
148             return {
149                 importedEvents: [],
150                 importErrors: [],
151             };
152         }
153         const batchedEvents = batches[i];
154         const [result] = await Promise.all([
155             Promise.all(
156                 batchedEvents.map(({ eventComponent, hasDefaultNotifications }) =>
157                     encryptEvent({
158                         eventComponent,
159                         addressKeys,
160                         calendarKeys,
161                         hasDefaultNotifications,
162                     })
163                 )
164             ),
165             wait(300),
166         ]);
167         const { errors, rest: encrypted } = splitErrors(result);
168         if (signal?.aborted) {
169             return {
170                 importedEvents: [],
171                 importErrors: [],
172             };
173         }
174         onProgress?.(encrypted, [], errors);
175         if (errors.length) {
176             errored.push(errors);
177         }
178         if (encrypted.length) {
179             const promise = submitEvents(encrypted, calendarID, memberID, api, overwrite).then((responses) => {
180                 const processedResponses = processResponses(responses, encrypted);
181                 const { errors, rest: importedSuccess } = splitErrors(processedResponses);
182                 imported.push(importedSuccess);
183                 errored.push(errors);
184                 if (!signal?.aborted) {
185                     onProgress?.([], importedSuccess, errors);
186                 }
187             });
188             promises.push(promise);
189         }
190     }
191     await Promise.all(promises);
193     return {
194         importedEvents: imported.flat(),
195         importErrors: errored.flat(),
196     };
200  * The following helper works as follows:
201  * * We encrypt and submit in parallel. As events are encrypted (in batches), they are moved to the import queue.
202  * * Batches of encrypted events are submitted at a constant rate
203  *   (which under normal circumstances should be jail-safe).
204  * * If a jail is hit, all ongoing submissions are paused and we wait a retry-after period
205  *   (defined as the max of all possible retry-after received from those submissions).
206  * * The submission process is resumed at a lower rate
207  * */
208 export const processWithJails = async ({
209     events,
210     calendarID,
211     memberID,
212     overwrite = true,
213     addressKeys,
214     calendarKeys,
215     api,
216     signal,
217     onProgress,
218 }: ProcessData) => {
219     const queueToEncrypt = chunk(events, BATCH_SIZE);
220     const queueToImport: EncryptedEvent[][] = [];
221     const imported: ImportedEvent[][] = [];
222     const errored: ImportEventError[][] = [];
224     // The API requests limit for the submit route is normally 40 calls per 10 seconds
225     // We start with a relax period that respects this limit.
226     let relaxTime = 300;
228     const encrypt = async () => {
229         while (queueToEncrypt.length && !signal?.aborted) {
230             const [eventsToEncrypt] = queueToEncrypt;
231             const result = await Promise.all(
232                 eventsToEncrypt.map(({ eventComponent, hasDefaultNotifications }) =>
233                     encryptEvent({
234                         eventComponent,
235                         hasDefaultNotifications,
236                         addressKeys,
237                         calendarKeys,
238                     })
239                 )
240             );
241             queueToEncrypt.splice(0, 1);
242             const { errors, rest: encrypted } = splitErrors(result);
243             queueToImport.push(encrypted);
244             if (!signal?.aborted) {
245                 onProgress?.(encrypted, [], errors);
246             }
247             if (errors.length) {
248                 errored.push(errors);
249             }
250         }
251     };
253     const submit = async (): Promise<void> => {
254         let paused = false;
255         const retryAfters: number[] = [];
256         const promises = [];
258         while ((queueToImport.length || queueToEncrypt.length) && !signal?.aborted && !paused) {
259             const [eventsToImport] = queueToImport;
260             if (!eventsToImport) {
261                 // encryption might not be finished yet, give it some time
262                 await wait(relaxTime);
263                 return submit();
264             }
265             queueToImport.splice(0, 1);
266             promises.push(
267                 submitEvents(eventsToImport, calendarID, memberID, api, overwrite, true)
268                     .then((responses) => {
269                         const processedResponses = processResponses(responses, eventsToImport);
270                         const { errors, rest: importedSuccess } = splitErrors(processedResponses);
271                         imported.push(importedSuccess);
272                         errored.push(errors);
273                         if (!signal?.aborted) {
274                             onProgress?.([], importedSuccess, errors);
275                         }
276                     })
277                     // it should be safe to change the value of paused in this loop because it can only be changed to true
278                     // eslint-disable-next-line @typescript-eslint/no-loop-func
279                     .catch((error: any) => {
280                         // the only error we can get here is the TOO_MANY_REQUESTS one. All others are caught by submitEvents
281                         paused = true;
282                         queueToImport.push(eventsToImport);
283                         retryAfters.push(parseInt(error?.response?.headers.get('retry-after') || '0', 10) * SECOND);
284                     })
285             );
287             await wait(relaxTime);
288         }
290         // wait until all ongoing promises are finished
291         await Promise.all(promises);
293         if (paused) {
294             // A jail was hit. Wait for a safe retry after period, then resume the process at a lower rate
295             await wait(Math.max(...retryAfters));
296             relaxTime *= 1.5;
297             return submit();
298         }
299     };
301     await Promise.all([encrypt(), submit()]);
303     return {
304         importedEvents: imported.flat(),
305         importErrors: errored.flat(),
306     };