Merge branch 'renovate/all-minor-patch' into 'main'
[ProtonMail-WebClient.git] / packages / docs-core / lib / RealtimeController / RealtimeController.ts
blob903d97b9ad7c6a850134210dadb2d04611744af8
1 import { c } from 'ttag'
2 import type { WebsocketConnectionInterface } from '@proton/docs-shared'
3 import {
4   assertUnreachableAndLog,
5   DecryptedMessage,
6   ProcessedIncomingRealtimeEventMessage,
7   Result,
8   type BroadcastSource,
9   type InternalEventBusInterface,
10   type InternalEventHandlerInterface,
11   type InternalEventInterface,
12   type RtsMessagePayload,
13 } from '@proton/docs-shared'
14 import type { WebsocketServiceInterface } from '../Services/Websockets/WebsocketServiceInterface'
15 import type { DocumentEntitlements } from '../Types/DocumentEntitlements'
16 import { WebsocketConnectionEvent } from '../Realtime/WebsocketEvent/WebsocketConnectionEvent'
17 import type { WebsocketConnectionEventPayloads } from '../Realtime/WebsocketEvent/WebsocketConnectionEventPayloads'
18 import { ConnectionCloseReason, EventTypeEnum } from '@proton/docs-proto'
19 import { utf8ArrayToString } from '@proton/crypto/lib/utils'
20 import { DocSizeTracker } from '../SizeTracker/SizeTracker'
21 import { PostApplicationError } from '../Application/ApplicationEvent'
22 import type { RealtimeControllerInterface } from './RealtimeControllerInterface'
23 import type { DocumentState, PublicDocumentState } from '../State/DocumentState'
24 import type { LoggerInterface } from '@proton/utils/logs'
25 import metrics from '@proton/metrics'
26 import type { DocControllerEventPayloads } from '../AuthenticatedDocController/AuthenticatedDocControllerEvent'
27 import { DocControllerEvent } from '../AuthenticatedDocController/AuthenticatedDocControllerEvent'
28 import type { GetDocumentMeta } from '../UseCase/GetDocumentMeta'
29 import type { LoadCommit } from '../UseCase/LoadCommit'
31 /**
32  * @TODO DRVDOC-802
33  * This should be an upper bound we should not expect to hit, because we expect the RTS to tell us it has given us all updates
34  * in a timely manner. However, due to DRVDOC-802, this event is not currently received, so we have lowered this value to something
35  * nominal as a temporary workaround.
36  */
37 export const MAX_MS_TO_WAIT_FOR_RTS_SYNC_AFTER_CONNECT = 100
38 export const MAX_MS_TO_WAIT_FOR_RTS_CONNECTION_BEFORE_DISPLAYING_EDITOR = 3_000
40 export class RealtimeController implements InternalEventHandlerInterface, RealtimeControllerInterface {
41   initialSyncTimer: ReturnType<typeof setTimeout> | null = null
42   initialConnectionTimer: ReturnType<typeof setTimeout> | null = null
44   isDestroyed = false
45   abortWebsocketConnectionAttempt = false
46   sizeTracker: DocSizeTracker = new DocSizeTracker()
47   isRefetchingStaleCommit = false
49   readonly updatesReceivedWhileParentNotReady: (DecryptedMessage | ProcessedIncomingRealtimeEventMessage)[] = []
51   constructor(
52     readonly websocketService: WebsocketServiceInterface,
53     private readonly eventBus: InternalEventBusInterface,
54     readonly documentState: DocumentState | PublicDocumentState,
55     readonly _loadCommit: LoadCommit,
56     readonly _getDocumentMeta: GetDocumentMeta,
57     readonly logger: LoggerInterface,
58   ) {
59     eventBus.addEventHandler(this, WebsocketConnectionEvent.Connecting)
60     eventBus.addEventHandler(this, WebsocketConnectionEvent.FailedToConnect)
61     eventBus.addEventHandler(this, WebsocketConnectionEvent.ConnectedAndReady)
62     eventBus.addEventHandler(this, WebsocketConnectionEvent.Disconnected)
63     eventBus.addEventHandler(this, WebsocketConnectionEvent.DocumentUpdateMessage)
64     eventBus.addEventHandler(this, WebsocketConnectionEvent.EventMessage)
65     eventBus.addEventHandler(this, WebsocketConnectionEvent.AckStatusChange)
66     eventBus.addEventHandler(this, WebsocketConnectionEvent.FailedToGetTokenCommitIdOutOfSync)
68     documentState.subscribeToProperty('editorReady', (value) => {
69       if (value) {
70         this.onEditorReadyEvent()
71       }
72     })
74     documentState.subscribeToProperty('baseCommit', (value) => {
75       if (value) {
76         this.resetSizeTracker(value.byteSize)
77       }
78     })
80     documentState.subscribeToEvent('EditorRequestsPropagationOfUpdate', (payload) => {
81       if (payload.message.type.wrapper !== 'conversion') {
82         this.propagateUpdate(payload.message, payload.debugSource)
83       }
84     })
86     documentState.subscribeToEvent('DriveFileConversionToDocBegan', () => {
87       this.abortWebsocketConnectionAttempt = true
88       this.closeConnection()
89     })
91     documentState.subscribeToEvent('DriveFileConversionToDocSucceeded', () => {
92       this.abortWebsocketConnectionAttempt = false
93       void this.reconnect()
94     })
96     documentState.subscribeToEvent('DebugMenuRequestingCommitWithRTS', (payload) => {
97       void this.debugSendCommitCommandToRTS(payload)
98     })
100     documentState.subscribeToProperty('currentCommitId', (value, previousValue) => {
101       const commitIdUpgraded = value && previousValue && value !== previousValue
102       if (
103         commitIdUpgraded &&
104         !this.websocketService.isConnected(this.documentState.getProperty('entitlements').nodeMeta)
105       ) {
106         this.logger.info('Reconnecting to RTS because currentCommitId changed and we are not connected')
107         void this.reconnect()
108       }
109     })
111     documentState.subscribeToProperty('documentTrashState', (value, previousValue) => {
112       if (
113         value === 'not_trashed' &&
114         previousValue === 'trashed' &&
115         !this.websocketService.isConnected(this.documentState.getProperty('entitlements').nodeMeta)
116       ) {
117         this.logger.info('Reconnecting to RTS because document was untrashed')
118         void this.reconnect()
119       }
120     })
121   }
123   destroy(): void {
124     this.isDestroyed = true
126     if (this.initialSyncTimer) {
127       clearTimeout(this.initialSyncTimer)
128     }
129     if (this.initialConnectionTimer) {
130       clearTimeout(this.initialConnectionTimer)
131     }
132   }
134   public resetSizeTracker(size: number): void {
135     this.sizeTracker.resetWithSize(size)
136   }
138   public closeConnection(): void {
139     this.websocketService.closeConnection(this.documentState.getProperty('entitlements').nodeMeta)
140   }
142   public async reconnect(): Promise<void> {
143     await this.websocketService.reconnectToDocumentWithoutDelay(this.documentState.getProperty('entitlements').nodeMeta)
144   }
146   public async debugSendCommitCommandToRTS(entitlements: DocumentEntitlements): Promise<void> {
147     await this.websocketService.debugSendCommitCommandToRTS(
148       this.documentState.getProperty('entitlements').nodeMeta,
149       entitlements.keys,
150     )
151   }
153   onEditorReadyEvent(): void {
154     if (this.updatesReceivedWhileParentNotReady.length > 0) {
155       this.logger.info(
156         `Playing back ${this.updatesReceivedWhileParentNotReady.length} realtime updates received while editor was not ready`,
157       )
159       for (const message of this.updatesReceivedWhileParentNotReady) {
160         if (message instanceof DecryptedMessage) {
161           void this.handleDocumentUpdatesMessage(message)
162         } else if (message instanceof ProcessedIncomingRealtimeEventMessage) {
163           void this.handleRealtimeServerEvent([message])
164         } else {
165           throw new Error('Attempting to replay unknown message type')
166         }
167       }
169       this.updatesReceivedWhileParentNotReady.length = 0
170     }
171   }
173   async handleEvent(event: InternalEventInterface<unknown>): Promise<void> {
174     if (event.type === WebsocketConnectionEvent.Disconnected) {
175       this.handleWebsocketDisconnectedEvent(
176         event.payload as WebsocketConnectionEventPayloads[WebsocketConnectionEvent.Disconnected],
177       )
178     } else if (event.type === WebsocketConnectionEvent.FailedToConnect) {
179       this.handleWebsocketFailedToConnectEvent()
180     } else if (event.type === WebsocketConnectionEvent.ConnectedAndReady) {
181       this.handleWebsocketConnectedEvent()
182     } else if (event.type === WebsocketConnectionEvent.Connecting) {
183       this.handleWebsocketConnectingEvent()
184     } else if (event.type === WebsocketConnectionEvent.DocumentUpdateMessage) {
185       const { message } =
186         event.payload as WebsocketConnectionEventPayloads[WebsocketConnectionEvent.DocumentUpdateMessage]
187       void this.handleDocumentUpdatesMessage(message)
188     } else if (event.type === WebsocketConnectionEvent.EventMessage) {
189       const { message } = event.payload as WebsocketConnectionEventPayloads[WebsocketConnectionEvent.EventMessage]
190       void this.handleRealtimeServerEvent(message)
191     } else if (event.type === WebsocketConnectionEvent.AckStatusChange) {
192       this.handleWebsocketAckStatusChangeEvent(
193         event.payload as WebsocketConnectionEventPayloads[WebsocketConnectionEvent.AckStatusChange],
194       )
195     } else if (event.type === WebsocketConnectionEvent.FailedToGetTokenCommitIdOutOfSync) {
196       /**
197        * The client was unable to get a token from the Docs API because the Commit ID the client had did not match
198        * what the server was expecting.
199        */
200       void this.refetchCommitDueToStaleContents('token-fail')
201     } else {
202       return
203     }
204   }
206   async handleDocumentUpdatesMessage(message: DecryptedMessage) {
207     if (!this.documentState.getProperty('editorReady')) {
208       this.updatesReceivedWhileParentNotReady.push(message)
210       return
211     }
213     this.sizeTracker.incrementSize(message.byteSize())
215     this.documentState.emitEvent({ name: 'RealtimeReceivedDocumentUpdate', payload: message })
216   }
218   beginInitialConnectionTimer(): void {
219     this.initialConnectionTimer = setTimeout(() => {
220       this.logger.warn('Initial connection with RTS cannot seem to be formed in a reasonable time')
221       this.documentState.setProperty('realtimeConnectionTimedOut', true)
222     }, MAX_MS_TO_WAIT_FOR_RTS_CONNECTION_BEFORE_DISPLAYING_EDITOR)
223   }
225   beginInitialSyncTimer(): void {
226     this.initialSyncTimer = setTimeout(() => {
227       this.logger.warn('Client did not receive ServerHasMoreOrLessGivenTheClientEverythingItHas event in time')
228       this.handleRealtimeConnectionReady()
229     }, MAX_MS_TO_WAIT_FOR_RTS_SYNC_AFTER_CONNECT)
230   }
232   handleRealtimeConnectionReady(): void {
233     if (this.isDestroyed) {
234       return
235     }
237     if (this.initialSyncTimer) {
238       clearTimeout(this.initialSyncTimer)
239       this.initialSyncTimer = null
240     }
242     this.documentState.setProperty('realtimeConnectionTimedOut', false)
244     if (this.documentState.getProperty('realtimeReadyToBroadcast')) {
245       return
246     }
248     this.documentState.setProperty('realtimeReadyToBroadcast', true)
249   }
251   handleWebsocketConnectingEvent(): void {
252     this.documentState.setProperty('realtimeStatus', 'connecting')
254     this.logger.info('Websocket connecting')
255   }
257   handleWebsocketConnectedEvent(): void {
258     this.documentState.setProperty('realtimeStatus', 'connected')
260     this.beginInitialSyncTimer()
262     if (this.initialConnectionTimer) {
263       clearTimeout(this.initialConnectionTimer)
264     }
265   }
267   handleWebsocketDisconnectedEvent(
268     payload: WebsocketConnectionEventPayloads[WebsocketConnectionEvent.Disconnected],
269   ): void {
270     this.documentState.setProperty('realtimeStatus', 'disconnected')
272     const reason = payload.serverReason
274     this.documentState.emitEvent({ name: 'RealtimeConnectionClosed', payload: reason })
276     if (reason.props.code === ConnectionCloseReason.CODES.STALE_COMMIT_ID) {
277       void this.refetchCommitDueToStaleContents('rts-disconnect')
278     }
280     if (reason.props.code === ConnectionCloseReason.CODES.TRAFFIC_ABUSE_MAX_DU_SIZE) {
281       metrics.docs_document_updates_save_error_total.increment({
282         type: 'document_too_big',
283       })
284     } else if (reason.props.code === ConnectionCloseReason.CODES.MESSAGE_TOO_BIG) {
285       metrics.docs_document_updates_save_error_total.increment({
286         type: 'update_too_big',
287       })
288     }
289   }
291   handleWebsocketFailedToConnectEvent(): void {
292     this.documentState.setProperty('realtimeStatus', 'disconnected')
294     this.documentState.emitEvent({ name: 'RealtimeFailedToConnect', payload: undefined })
295   }
297   handleWebsocketAckStatusChangeEvent(
298     event: WebsocketConnectionEventPayloads[WebsocketConnectionEvent.AckStatusChange],
299   ): void {
300     this.documentState.setProperty('realtimeIsExperiencingErroredSync', event.ledger.hasErroredMessages())
301   }
303   initializeConnection(): WebsocketConnectionInterface {
304     const entitlements = this.documentState.getProperty('entitlements')
306     const connection = this.websocketService.createConnection(
307       this.documentState.getProperty('entitlements').nodeMeta,
308       entitlements.keys,
309       {
310         commitId: () => this.documentState.getProperty('currentCommitId'),
311       },
312     )
314     connection
315       .connect(() => {
316         return this.abortWebsocketConnectionAttempt
317       })
318       .catch((e) => {
319         this.logger.error(e)
320       })
322     this.beginInitialConnectionTimer()
324     return connection
325   }
327   public propagateUpdate(message: RtsMessagePayload, debugSource: BroadcastSource): void {
328     if (message.type.wrapper === 'du') {
329       if (!this.sizeTracker.canPostUpdateOfSize(message.content.byteLength)) {
330         this.handleAttemptingToBroadcastUpdateThatIsTooLarge()
331       } else {
332         this.sizeTracker.incrementSize(message.content.byteLength)
334         void this.websocketService.sendDocumentUpdateMessage(
335           this.documentState.getProperty('entitlements').nodeMeta,
336           message.content,
337           debugSource,
338         )
339       }
340     } else if (message.type.wrapper === 'events') {
341       void this.websocketService.sendEventMessage(
342         this.documentState.getProperty('entitlements').nodeMeta,
343         message.content,
344         message.type.eventType,
345         debugSource,
346       )
347     } else {
348       throw new Error('Unknown message type')
349     }
350   }
352   handleAttemptingToBroadcastUpdateThatIsTooLarge(): void {
353     void this.websocketService.flushPendingUpdates()
355     this.logger.error(new Error('Update Too Large'))
357     this.documentState.setProperty('realtimeIsLockedDueToSizeContraint', true)
359     PostApplicationError(this.eventBus, {
360       translatedErrorTitle: c('Error').t`Update Too Large`,
361       translatedError: c('Error')
362         .t`The last update you made to the document is either too large, or would exceed the total document size limit. Editing has been temporarily disabled. Your change will not be saved. Please export a copy of your document and reload the page.`,
363     })
365     this.documentState.emitEvent({ name: 'RealtimeAttemptingToSendUpdateThatIsTooLarge', payload: undefined })
366   }
368   async handleRealtimeServerEvent(events: ProcessedIncomingRealtimeEventMessage[]) {
369     if (!this.documentState.getProperty('editorReady')) {
370       this.updatesReceivedWhileParentNotReady.push(...events)
372       return
373     }
375     for (const event of events) {
376       switch (event.props.type) {
377         case EventTypeEnum.ClientIsRequestingOtherClientsToBroadcastTheirState:
378         case EventTypeEnum.ServerIsRequestingClientToBroadcastItsState:
379           this.documentState.emitEvent({ name: 'RealtimeRequestingClientToBroadcastItsState', payload: undefined })
380           break
381         case EventTypeEnum.ServerIsInformingClientThatTheDocumentCommitHasBeenUpdated:
382           const decodedContent = utf8ArrayToString(event.props.content)
383           const parsedMessage = JSON.parse(decodedContent)
384           this.documentState.setProperty('currentCommitId', parsedMessage.commitId)
385           break
386         case EventTypeEnum.ClientHasSentACommentMessage: {
387           this.eventBus.publish({
388             type: DocControllerEvent.RealtimeCommentMessageReceived,
389             payload: <DocControllerEventPayloads[DocControllerEvent.RealtimeCommentMessageReceived]>{
390               message: event.props.content,
391             },
392           })
394           break
395         }
396         case EventTypeEnum.ClientIsBroadcastingItsPresenceState: {
397           this.documentState.emitEvent({
398             name: 'RealtimeReceivedOtherClientPresenceState',
399             payload: event.props.content,
400           })
401           break
402         }
403         case EventTypeEnum.ServerHasMoreOrLessGivenTheClientEverythingItHas:
404           this.handleRealtimeConnectionReady()
405           break
406         case EventTypeEnum.ServerIsPlacingEmptyActivityIndicatorInStreamToIndicateTheStreamIsStillActive:
407         case EventTypeEnum.ClientIsDebugRequestingServerToPerformCommit:
408         case EventTypeEnum.ServerIsReadyToAcceptClientMessages:
409         case EventTypeEnum.ServerIsNotifyingOtherServersToDisconnectAllClientsFromTheStream:
410           break
411         default:
412           assertUnreachableAndLog(event.props)
413       }
414     }
415   }
417   /**
418    * If the RTS rejects or drops our connection due to our commit ID not being what it has, we will refetch the document
419    * and its binary from the main API and update our content.
420    */
421   async refetchCommitDueToStaleContents(source: 'token-fail' | 'rts-disconnect') {
422     if (this.isRefetchingStaleCommit) {
423       this.logger.info('Attempting to refetch stale commit but refetch already in progress')
424       return
425     }
427     this.isRefetchingStaleCommit = true
429     this.logger.info('Refetching document due to stale commit ID from source', source)
431     const fail = (error: string) => {
432       this.isRefetchingStaleCommit = false
434       this.logger.error(error)
436       this.eventBus.publish({
437         type: DocControllerEvent.UnableToResolveCommitIdConflict,
438         payload: undefined,
439       })
440     }
442     const nodeMeta = this.documentState.getProperty('entitlements').nodeMeta
444     const result = await this._getDocumentMeta.execute(nodeMeta)
445     if (result.isFailed()) {
446       fail(`Failed to reload document meta: ${result.getError()}`)
448       return
449     }
451     const latestCommitId = result.getValue().latestCommitId()
453     if (!latestCommitId || latestCommitId === this.documentState.getProperty('currentCommitId')) {
454       fail(!latestCommitId ? 'Reloaded commit but commit id was null' : 'Reloaded commit id is the same as current')
456       return
457     }
459     const entitlements = this.documentState.getProperty('entitlements')
461     const decryptResult = await this._loadCommit.execute(nodeMeta, latestCommitId, entitlements.keys.documentContentKey)
462     if (decryptResult.isFailed()) {
463       fail(`Failed to reload or decrypt commit: ${decryptResult.getError()}`)
465       return Result.fail(decryptResult.getError())
466     }
468     const decryptedCommit = decryptResult.getValue()
470     this.logger.info(
471       `Reownloaded and decrypted commit id ${decryptedCommit.commitId} with ${decryptedCommit?.numberOfUpdates()} updates`,
472     )
474     this.documentState.setProperty('baseCommit', decryptedCommit)
475     this.documentState.setProperty('currentCommitId', decryptedCommit.commitId)
477     this.isRefetchingStaleCommit = false
478   }