1 import { c } from 'ttag'
2 import type { WebsocketConnectionInterface } from '@proton/docs-shared'
4 assertUnreachableAndLog,
6 ProcessedIncomingRealtimeEventMessage,
9 type InternalEventBusInterface,
10 type InternalEventHandlerInterface,
11 type InternalEventInterface,
12 type RtsMessagePayload,
13 } from '@proton/docs-shared'
14 import type { WebsocketServiceInterface } from '../Services/Websockets/WebsocketServiceInterface'
15 import type { DocumentEntitlements } from '../Types/DocumentEntitlements'
16 import { WebsocketConnectionEvent } from '../Realtime/WebsocketEvent/WebsocketConnectionEvent'
17 import type { WebsocketConnectionEventPayloads } from '../Realtime/WebsocketEvent/WebsocketConnectionEventPayloads'
18 import { ConnectionCloseReason, EventTypeEnum } from '@proton/docs-proto'
19 import { utf8ArrayToString } from '@proton/crypto/lib/utils'
20 import { DocSizeTracker } from '../SizeTracker/SizeTracker'
21 import { PostApplicationError } from '../Application/ApplicationEvent'
22 import type { RealtimeControllerInterface } from './RealtimeControllerInterface'
23 import type { DocumentState, PublicDocumentState } from '../State/DocumentState'
24 import type { LoggerInterface } from '@proton/utils/logs'
25 import metrics from '@proton/metrics'
26 import type { DocControllerEventPayloads } from '../AuthenticatedDocController/AuthenticatedDocControllerEvent'
27 import { DocControllerEvent } from '../AuthenticatedDocController/AuthenticatedDocControllerEvent'
28 import type { GetDocumentMeta } from '../UseCase/GetDocumentMeta'
29 import type { LoadCommit } from '../UseCase/LoadCommit'
33 * This should be an upper bound we should not expect to hit, because we expect the RTS to tell us it has given us all updates
34 * in a timely manner. However, due to DRVDOC-802, this event is not currently received, so we have lowered this value to something
35 * nominal as a temporary workaround.
37 export const MAX_MS_TO_WAIT_FOR_RTS_SYNC_AFTER_CONNECT = 100
38 export const MAX_MS_TO_WAIT_FOR_RTS_CONNECTION_BEFORE_DISPLAYING_EDITOR = 3_000
40 export class RealtimeController implements InternalEventHandlerInterface, RealtimeControllerInterface {
41 initialSyncTimer: ReturnType<typeof setTimeout> | null = null
42 initialConnectionTimer: ReturnType<typeof setTimeout> | null = null
45 abortWebsocketConnectionAttempt = false
46 sizeTracker: DocSizeTracker = new DocSizeTracker()
47 isRefetchingStaleCommit = false
49 readonly updatesReceivedWhileParentNotReady: (DecryptedMessage | ProcessedIncomingRealtimeEventMessage)[] = []
52 readonly websocketService: WebsocketServiceInterface,
53 private readonly eventBus: InternalEventBusInterface,
54 readonly documentState: DocumentState | PublicDocumentState,
55 readonly _loadCommit: LoadCommit,
56 readonly _getDocumentMeta: GetDocumentMeta,
57 readonly logger: LoggerInterface,
59 eventBus.addEventHandler(this, WebsocketConnectionEvent.Connecting)
60 eventBus.addEventHandler(this, WebsocketConnectionEvent.FailedToConnect)
61 eventBus.addEventHandler(this, WebsocketConnectionEvent.ConnectedAndReady)
62 eventBus.addEventHandler(this, WebsocketConnectionEvent.Disconnected)
63 eventBus.addEventHandler(this, WebsocketConnectionEvent.DocumentUpdateMessage)
64 eventBus.addEventHandler(this, WebsocketConnectionEvent.EventMessage)
65 eventBus.addEventHandler(this, WebsocketConnectionEvent.AckStatusChange)
66 eventBus.addEventHandler(this, WebsocketConnectionEvent.FailedToGetTokenCommitIdOutOfSync)
68 documentState.subscribeToProperty('editorReady', (value) => {
70 this.onEditorReadyEvent()
74 documentState.subscribeToProperty('baseCommit', (value) => {
76 this.resetSizeTracker(value.byteSize)
80 documentState.subscribeToEvent('EditorRequestsPropagationOfUpdate', (payload) => {
81 if (payload.message.type.wrapper !== 'conversion') {
82 this.propagateUpdate(payload.message, payload.debugSource)
86 documentState.subscribeToEvent('DriveFileConversionToDocBegan', () => {
87 this.abortWebsocketConnectionAttempt = true
88 this.closeConnection()
91 documentState.subscribeToEvent('DriveFileConversionToDocSucceeded', () => {
92 this.abortWebsocketConnectionAttempt = false
96 documentState.subscribeToEvent('DebugMenuRequestingCommitWithRTS', (payload) => {
97 void this.debugSendCommitCommandToRTS(payload)
100 documentState.subscribeToProperty('currentCommitId', (value, previousValue) => {
101 const commitIdUpgraded = value && previousValue && value !== previousValue
104 !this.websocketService.isConnected(this.documentState.getProperty('entitlements').nodeMeta)
106 this.logger.info('Reconnecting to RTS because currentCommitId changed and we are not connected')
107 void this.reconnect()
111 documentState.subscribeToProperty('documentTrashState', (value, previousValue) => {
113 value === 'not_trashed' &&
114 previousValue === 'trashed' &&
115 !this.websocketService.isConnected(this.documentState.getProperty('entitlements').nodeMeta)
117 this.logger.info('Reconnecting to RTS because document was untrashed')
118 void this.reconnect()
124 this.isDestroyed = true
126 if (this.initialSyncTimer) {
127 clearTimeout(this.initialSyncTimer)
129 if (this.initialConnectionTimer) {
130 clearTimeout(this.initialConnectionTimer)
134 public resetSizeTracker(size: number): void {
135 this.sizeTracker.resetWithSize(size)
138 public closeConnection(): void {
139 this.websocketService.closeConnection(this.documentState.getProperty('entitlements').nodeMeta)
142 public async reconnect(): Promise<void> {
143 await this.websocketService.reconnectToDocumentWithoutDelay(this.documentState.getProperty('entitlements').nodeMeta)
146 public async debugSendCommitCommandToRTS(entitlements: DocumentEntitlements): Promise<void> {
147 await this.websocketService.debugSendCommitCommandToRTS(
148 this.documentState.getProperty('entitlements').nodeMeta,
153 onEditorReadyEvent(): void {
154 if (this.updatesReceivedWhileParentNotReady.length > 0) {
156 `Playing back ${this.updatesReceivedWhileParentNotReady.length} realtime updates received while editor was not ready`,
159 for (const message of this.updatesReceivedWhileParentNotReady) {
160 if (message instanceof DecryptedMessage) {
161 void this.handleDocumentUpdatesMessage(message)
162 } else if (message instanceof ProcessedIncomingRealtimeEventMessage) {
163 void this.handleRealtimeServerEvent([message])
165 throw new Error('Attempting to replay unknown message type')
169 this.updatesReceivedWhileParentNotReady.length = 0
173 async handleEvent(event: InternalEventInterface<unknown>): Promise<void> {
174 if (event.type === WebsocketConnectionEvent.Disconnected) {
175 this.handleWebsocketDisconnectedEvent(
176 event.payload as WebsocketConnectionEventPayloads[WebsocketConnectionEvent.Disconnected],
178 } else if (event.type === WebsocketConnectionEvent.FailedToConnect) {
179 this.handleWebsocketFailedToConnectEvent()
180 } else if (event.type === WebsocketConnectionEvent.ConnectedAndReady) {
181 this.handleWebsocketConnectedEvent()
182 } else if (event.type === WebsocketConnectionEvent.Connecting) {
183 this.handleWebsocketConnectingEvent()
184 } else if (event.type === WebsocketConnectionEvent.DocumentUpdateMessage) {
186 event.payload as WebsocketConnectionEventPayloads[WebsocketConnectionEvent.DocumentUpdateMessage]
187 void this.handleDocumentUpdatesMessage(message)
188 } else if (event.type === WebsocketConnectionEvent.EventMessage) {
189 const { message } = event.payload as WebsocketConnectionEventPayloads[WebsocketConnectionEvent.EventMessage]
190 void this.handleRealtimeServerEvent(message)
191 } else if (event.type === WebsocketConnectionEvent.AckStatusChange) {
192 this.handleWebsocketAckStatusChangeEvent(
193 event.payload as WebsocketConnectionEventPayloads[WebsocketConnectionEvent.AckStatusChange],
195 } else if (event.type === WebsocketConnectionEvent.FailedToGetTokenCommitIdOutOfSync) {
197 * The client was unable to get a token from the Docs API because the Commit ID the client had did not match
198 * what the server was expecting.
200 void this.refetchCommitDueToStaleContents('token-fail')
206 async handleDocumentUpdatesMessage(message: DecryptedMessage) {
207 if (!this.documentState.getProperty('editorReady')) {
208 this.updatesReceivedWhileParentNotReady.push(message)
213 this.sizeTracker.incrementSize(message.byteSize())
215 this.documentState.emitEvent({ name: 'RealtimeReceivedDocumentUpdate', payload: message })
218 beginInitialConnectionTimer(): void {
219 this.initialConnectionTimer = setTimeout(() => {
220 this.logger.warn('Initial connection with RTS cannot seem to be formed in a reasonable time')
221 this.documentState.setProperty('realtimeConnectionTimedOut', true)
222 }, MAX_MS_TO_WAIT_FOR_RTS_CONNECTION_BEFORE_DISPLAYING_EDITOR)
225 beginInitialSyncTimer(): void {
226 this.initialSyncTimer = setTimeout(() => {
227 this.logger.warn('Client did not receive ServerHasMoreOrLessGivenTheClientEverythingItHas event in time')
228 this.handleRealtimeConnectionReady()
229 }, MAX_MS_TO_WAIT_FOR_RTS_SYNC_AFTER_CONNECT)
232 handleRealtimeConnectionReady(): void {
233 if (this.isDestroyed) {
237 if (this.initialSyncTimer) {
238 clearTimeout(this.initialSyncTimer)
239 this.initialSyncTimer = null
242 this.documentState.setProperty('realtimeConnectionTimedOut', false)
244 if (this.documentState.getProperty('realtimeReadyToBroadcast')) {
248 this.documentState.setProperty('realtimeReadyToBroadcast', true)
251 handleWebsocketConnectingEvent(): void {
252 this.documentState.setProperty('realtimeStatus', 'connecting')
254 this.logger.info('Websocket connecting')
257 handleWebsocketConnectedEvent(): void {
258 this.documentState.setProperty('realtimeStatus', 'connected')
260 this.beginInitialSyncTimer()
262 if (this.initialConnectionTimer) {
263 clearTimeout(this.initialConnectionTimer)
267 handleWebsocketDisconnectedEvent(
268 payload: WebsocketConnectionEventPayloads[WebsocketConnectionEvent.Disconnected],
270 this.documentState.setProperty('realtimeStatus', 'disconnected')
272 const reason = payload.serverReason
274 this.documentState.emitEvent({ name: 'RealtimeConnectionClosed', payload: reason })
276 if (reason.props.code === ConnectionCloseReason.CODES.STALE_COMMIT_ID) {
277 void this.refetchCommitDueToStaleContents('rts-disconnect')
280 if (reason.props.code === ConnectionCloseReason.CODES.TRAFFIC_ABUSE_MAX_DU_SIZE) {
281 metrics.docs_document_updates_save_error_total.increment({
282 type: 'document_too_big',
284 } else if (reason.props.code === ConnectionCloseReason.CODES.MESSAGE_TOO_BIG) {
285 metrics.docs_document_updates_save_error_total.increment({
286 type: 'update_too_big',
291 handleWebsocketFailedToConnectEvent(): void {
292 this.documentState.setProperty('realtimeStatus', 'disconnected')
294 this.documentState.emitEvent({ name: 'RealtimeFailedToConnect', payload: undefined })
297 handleWebsocketAckStatusChangeEvent(
298 event: WebsocketConnectionEventPayloads[WebsocketConnectionEvent.AckStatusChange],
300 this.documentState.setProperty('realtimeIsExperiencingErroredSync', event.ledger.hasErroredMessages())
303 initializeConnection(): WebsocketConnectionInterface {
304 const entitlements = this.documentState.getProperty('entitlements')
306 const connection = this.websocketService.createConnection(
307 this.documentState.getProperty('entitlements').nodeMeta,
310 commitId: () => this.documentState.getProperty('currentCommitId'),
316 return this.abortWebsocketConnectionAttempt
322 this.beginInitialConnectionTimer()
327 public propagateUpdate(message: RtsMessagePayload, debugSource: BroadcastSource): void {
328 if (message.type.wrapper === 'du') {
329 if (!this.sizeTracker.canPostUpdateOfSize(message.content.byteLength)) {
330 this.handleAttemptingToBroadcastUpdateThatIsTooLarge()
332 this.sizeTracker.incrementSize(message.content.byteLength)
334 void this.websocketService.sendDocumentUpdateMessage(
335 this.documentState.getProperty('entitlements').nodeMeta,
340 } else if (message.type.wrapper === 'events') {
341 void this.websocketService.sendEventMessage(
342 this.documentState.getProperty('entitlements').nodeMeta,
344 message.type.eventType,
348 throw new Error('Unknown message type')
352 handleAttemptingToBroadcastUpdateThatIsTooLarge(): void {
353 void this.websocketService.flushPendingUpdates()
355 this.logger.error(new Error('Update Too Large'))
357 this.documentState.setProperty('realtimeIsLockedDueToSizeContraint', true)
359 PostApplicationError(this.eventBus, {
360 translatedErrorTitle: c('Error').t`Update Too Large`,
361 translatedError: c('Error')
362 .t`The last update you made to the document is either too large, or would exceed the total document size limit. Editing has been temporarily disabled. Your change will not be saved. Please export a copy of your document and reload the page.`,
365 this.documentState.emitEvent({ name: 'RealtimeAttemptingToSendUpdateThatIsTooLarge', payload: undefined })
368 async handleRealtimeServerEvent(events: ProcessedIncomingRealtimeEventMessage[]) {
369 if (!this.documentState.getProperty('editorReady')) {
370 this.updatesReceivedWhileParentNotReady.push(...events)
375 for (const event of events) {
376 switch (event.props.type) {
377 case EventTypeEnum.ClientIsRequestingOtherClientsToBroadcastTheirState:
378 case EventTypeEnum.ServerIsRequestingClientToBroadcastItsState:
379 this.documentState.emitEvent({ name: 'RealtimeRequestingClientToBroadcastItsState', payload: undefined })
381 case EventTypeEnum.ServerIsInformingClientThatTheDocumentCommitHasBeenUpdated:
382 const decodedContent = utf8ArrayToString(event.props.content)
383 const parsedMessage = JSON.parse(decodedContent)
384 this.documentState.setProperty('currentCommitId', parsedMessage.commitId)
386 case EventTypeEnum.ClientHasSentACommentMessage: {
387 this.eventBus.publish({
388 type: DocControllerEvent.RealtimeCommentMessageReceived,
389 payload: <DocControllerEventPayloads[DocControllerEvent.RealtimeCommentMessageReceived]>{
390 message: event.props.content,
396 case EventTypeEnum.ClientIsBroadcastingItsPresenceState: {
397 this.documentState.emitEvent({
398 name: 'RealtimeReceivedOtherClientPresenceState',
399 payload: event.props.content,
403 case EventTypeEnum.ServerHasMoreOrLessGivenTheClientEverythingItHas:
404 this.handleRealtimeConnectionReady()
406 case EventTypeEnum.ServerIsPlacingEmptyActivityIndicatorInStreamToIndicateTheStreamIsStillActive:
407 case EventTypeEnum.ClientIsDebugRequestingServerToPerformCommit:
408 case EventTypeEnum.ServerIsReadyToAcceptClientMessages:
409 case EventTypeEnum.ServerIsNotifyingOtherServersToDisconnectAllClientsFromTheStream:
412 assertUnreachableAndLog(event.props)
418 * If the RTS rejects or drops our connection due to our commit ID not being what it has, we will refetch the document
419 * and its binary from the main API and update our content.
421 async refetchCommitDueToStaleContents(source: 'token-fail' | 'rts-disconnect') {
422 if (this.isRefetchingStaleCommit) {
423 this.logger.info('Attempting to refetch stale commit but refetch already in progress')
427 this.isRefetchingStaleCommit = true
429 this.logger.info('Refetching document due to stale commit ID from source', source)
431 const fail = (error: string) => {
432 this.isRefetchingStaleCommit = false
434 this.logger.error(error)
436 this.eventBus.publish({
437 type: DocControllerEvent.UnableToResolveCommitIdConflict,
442 const nodeMeta = this.documentState.getProperty('entitlements').nodeMeta
444 const result = await this._getDocumentMeta.execute(nodeMeta)
445 if (result.isFailed()) {
446 fail(`Failed to reload document meta: ${result.getError()}`)
451 const latestCommitId = result.getValue().latestCommitId()
453 if (!latestCommitId || latestCommitId === this.documentState.getProperty('currentCommitId')) {
454 fail(!latestCommitId ? 'Reloaded commit but commit id was null' : 'Reloaded commit id is the same as current')
459 const entitlements = this.documentState.getProperty('entitlements')
461 const decryptResult = await this._loadCommit.execute(nodeMeta, latestCommitId, entitlements.keys.documentContentKey)
462 if (decryptResult.isFailed()) {
463 fail(`Failed to reload or decrypt commit: ${decryptResult.getError()}`)
465 return Result.fail(decryptResult.getError())
468 const decryptedCommit = decryptResult.getValue()
471 `Reownloaded and decrypted commit id ${decryptedCommit.commitId} with ${decryptedCommit?.numberOfUpdates()} updates`,
474 this.documentState.setProperty('baseCommit', decryptedCommit)
475 this.documentState.setProperty('currentCommitId', decryptedCommit.commitId)
477 this.isRefetchingStaleCommit = false