1 import { getHasSharedEventContent, getHasSharedKeyPacket } from '@proton/shared/lib/calendar/apiModels';
2 import chunk from '@proton/utils/chunk';
4 import { syncMultipleEvents } from '../../api/calendars';
5 import { HOUR, SECOND } from '../../constants';
6 import { HTTP_ERROR_CODES } from '../../errors';
7 import { wait } from '../../helpers/promise';
8 import type { Api, DecryptedKey } from '../../interfaces';
13 SyncMultipleApiResponse,
14 SyncMultipleApiResponses,
16 } from '../../interfaces/calendar';
17 import type { CreateCalendarEventSyncData } from '../../interfaces/calendar/Api';
18 import { DEFAULT_ATTENDEE_PERMISSIONS } from '../constants';
19 import { getCreationKeys } from '../crypto/keys/helpers';
20 import { getIsSuccessSyncApiResponse } from '../helper';
21 import { IMPORT_EVENT_ERROR_TYPE, ImportEventError } from '../icsSurgery/ImportEventError';
22 import { createCalendarEvent } from '../serialize';
23 import { prodId } from '../vcalConfig';
24 import { getComponentIdentifier, splitErrors } from './import';
26 const BATCH_SIZE = 10;
28 const encryptEvent = async ({
32 hasDefaultNotifications,
34 eventComponent: VcalVeventComponent;
35 addressKeys: DecryptedKey[];
36 calendarKeys: DecryptedCalendarKey[];
37 hasDefaultNotifications: boolean;
39 const componentId = getComponentIdentifier(eventComponent);
41 const data = await createCalendarEvent({
44 isSwitchCalendar: false,
45 hasDefaultNotifications,
46 ...(await getCreationKeys({ newAddressKeys: addressKeys, newCalendarKeys: calendarKeys })),
48 if (!getHasSharedKeyPacket(data) || !getHasSharedEventContent(data)) {
49 throw new Error('Missing shared data');
51 return { data, component: eventComponent };
52 } catch (error: any) {
53 return new ImportEventError({
54 errorType: IMPORT_EVENT_ERROR_TYPE.ENCRYPTION_ERROR,
55 componentIdentifiers: { component: 'vevent', componentId, prodId, domain: 'proton.me' },
60 const submitEvents = async (
61 events: EncryptedEvent[],
67 ): Promise<SyncMultipleApiResponses[]> => {
69 const Events = events.map(
70 ({ data }): CreateCalendarEventSyncData => ({
71 Overwrite: overwrite ? 1 : 0,
72 Event: { Permissions: DEFAULT_ATTENDEE_PERMISSIONS, ...data },
75 const { Responses } = await api<SyncMultipleApiResponse>({
76 ...syncMultipleEvents(calendarID, { MemberID: memberID, IsImport: 1, Events }),
79 ignoreHandler: withJails ? [HTTP_ERROR_CODES.TOO_MANY_REQUESTS] : undefined,
82 } catch (error: any) {
83 if (withJails && error?.status === HTTP_ERROR_CODES.TOO_MANY_REQUESTS) {
86 return events.map((event, index) => ({
88 Response: { Code: 0, Error: `${error}` },
93 const processResponses = (responses: SyncMultipleApiResponses[], events: EncryptedEvent[]) => {
94 return responses.map((response): ImportedEvent | ImportEventError => {
97 Response: { Error: errorMessage },
99 if (getIsSuccessSyncApiResponse(response)) {
105 const error = new Error(errorMessage);
106 const component = events[Index]?.component;
107 const componentId = component ? getComponentIdentifier(component) : '';
108 return new ImportEventError({
109 errorType: IMPORT_EVENT_ERROR_TYPE.EXTERNAL_ERROR,
110 componentIdentifiers: { component: 'vevent', componentId, prodId, domain: 'proton.me' },
111 externalError: error,
116 interface ProcessData {
117 events: { eventComponent: VcalVeventComponent; hasDefaultNotifications: boolean }[];
120 addressKeys: DecryptedKey[];
121 calendarKeys: DecryptedCalendarKey[];
124 signal?: AbortSignal;
125 onProgress?: (encrypted: EncryptedEvent[], imported: EncryptedEvent[], errors: ImportEventError[]) => void;
128 export const processInBatches = async ({
139 const batches = chunk(events, BATCH_SIZE);
141 const imported: ImportedEvent[][] = [];
142 const errored: ImportEventError[][] = [];
144 for (let i = 0; i < batches.length; i++) {
145 // The API requests limit for the submit route is 40 calls per 10 seconds
146 // We play it safe by enforcing a 300ms minimum wait between API calls. During this wait we encrypt the events
147 if (signal?.aborted) {
153 const batchedEvents = batches[i];
154 const [result] = await Promise.all([
156 batchedEvents.map(({ eventComponent, hasDefaultNotifications }) =>
161 hasDefaultNotifications,
167 const { errors, rest: encrypted } = splitErrors(result);
168 if (signal?.aborted) {
174 onProgress?.(encrypted, [], errors);
176 errored.push(errors);
178 if (encrypted.length) {
179 const promise = submitEvents(encrypted, calendarID, memberID, api, overwrite).then((responses) => {
180 const processedResponses = processResponses(responses, encrypted);
181 const { errors, rest: importedSuccess } = splitErrors(processedResponses);
182 imported.push(importedSuccess);
183 errored.push(errors);
184 if (!signal?.aborted) {
185 onProgress?.([], importedSuccess, errors);
188 promises.push(promise);
191 await Promise.all(promises);
194 importedEvents: imported.flat(),
195 importErrors: errored.flat(),
200 * The following helper works as follows:
201 * * We encrypt and submit in parallel. As events are encrypted (in batches), they are moved to the import queue.
202 * * Batches of encrypted events are submitted at a constant rate
203 * (which under normal circumstances should be jail-safe).
204 * * If a jail is hit, all ongoing submissions are paused and we wait a retry-after period
205 * (defined as the max of all possible retry-after received from those submissions).
206 * * The submission process is resumed at a lower rate
208 export const processWithJails = async ({
219 const queueToEncrypt = chunk(events, BATCH_SIZE);
220 const queueToImport: EncryptedEvent[][] = [];
221 const imported: ImportedEvent[][] = [];
222 const errored: ImportEventError[][] = [];
224 // The API requests limit for the submit route is normally 40 calls per 10 seconds
225 // We start with a relax period that respects this limit.
228 const encrypt = async () => {
229 while (queueToEncrypt.length && !signal?.aborted) {
230 const [eventsToEncrypt] = queueToEncrypt;
231 const result = await Promise.all(
232 eventsToEncrypt.map(({ eventComponent, hasDefaultNotifications }) =>
235 hasDefaultNotifications,
241 queueToEncrypt.splice(0, 1);
242 const { errors, rest: encrypted } = splitErrors(result);
243 queueToImport.push(encrypted);
244 if (!signal?.aborted) {
245 onProgress?.(encrypted, [], errors);
248 errored.push(errors);
253 const submit = async (): Promise<void> => {
255 const retryAfters: number[] = [];
258 while ((queueToImport.length || queueToEncrypt.length) && !signal?.aborted && !paused) {
259 const [eventsToImport] = queueToImport;
260 if (!eventsToImport) {
261 // encryption might not be finished yet, give it some time
262 await wait(relaxTime);
265 queueToImport.splice(0, 1);
267 submitEvents(eventsToImport, calendarID, memberID, api, overwrite, true)
268 .then((responses) => {
269 const processedResponses = processResponses(responses, eventsToImport);
270 const { errors, rest: importedSuccess } = splitErrors(processedResponses);
271 imported.push(importedSuccess);
272 errored.push(errors);
273 if (!signal?.aborted) {
274 onProgress?.([], importedSuccess, errors);
277 // it should be safe to change the value of paused in this loop because it can only be changed to true
278 // eslint-disable-next-line @typescript-eslint/no-loop-func
279 .catch((error: any) => {
280 // the only error we can get here is the TOO_MANY_REQUESTS one. All others are caught by submitEvents
282 queueToImport.push(eventsToImport);
283 retryAfters.push(parseInt(error?.response?.headers.get('retry-after') || '0', 10) * SECOND);
287 await wait(relaxTime);
290 // wait until all ongoing promises are finished
291 await Promise.all(promises);
294 // A jail was hit. Wait for a safe retry after period, then resume the process at a lower rate
295 await wait(Math.max(...retryAfters));
301 await Promise.all([encrypt(), submit()]);
304 importedEvents: imported.flat(),
305 importErrors: errored.flat(),