1 import { stringToUtf8Array } from '@proton/crypto/lib/utils'
2 import type { DocumentKeys, NodeMeta } from '@proton/drive-store'
3 import type { ServerMessageWithDocumentUpdates, ServerMessageWithEvents } from '@proton/docs-proto'
4 import { DecryptedValue, EventTypeEnum } from '@proton/docs-proto'
5 import type { LoggerInterface } from '@proton/utils/logs'
6 import type { GetRealtimeUrlAndToken } from '../../UseCase/CreateRealtimeValetToken'
7 import type { DecryptMessage } from '../../UseCase/DecryptMessage'
8 import type { EncryptMessage } from '../../UseCase/EncryptMessage'
9 import { WebsocketService } from './WebsocketService'
10 import type { InternalEventBusInterface, WebsocketConnectionInterface } from '@proton/docs-shared'
11 import { BroadcastSource } from '@proton/docs-shared'
12 import { Result } from '../../Domain/Result/Result'
13 import type { EncryptionMetadata } from '../../Types/EncryptionMetadata'
14 import type { DocumentConnectionRecord } from './DocumentConnectionRecord'
15 import { WebsocketConnectionEvent } from '../../Realtime/WebsocketEvent/WebsocketConnectionEvent'
16 import { type UpdateDebouncer } from './Debouncer/UpdateDebouncer'
17 import { DocumentDebounceMode } from './Debouncer/DocumentDebounceMode'
18 import type { PrivateKeyReference, SessionKey } from '@proton/crypto'
19 import type { MetricService } from '../Metrics/MetricService'
21 const mockOnReadyContentPayload = new TextEncoder().encode(
22 JSON.stringify({ connectionId: '12345678', clientUpgradeRecommended: true, clientUpgradeRequired: true }),
25 describe('WebsocketService', () => {
26 let service: WebsocketService
27 let eventBus: InternalEventBusInterface
28 let encryptMessage: EncryptMessage
29 let debouncer: UpdateDebouncer
30 let connection: WebsocketConnectionInterface
31 let record: DocumentConnectionRecord
32 let logger: LoggerInterface
33 let document: NodeMeta
34 let keys: DocumentKeys
35 let metricService: MetricService
37 const createService = async (mode: DocumentDebounceMode) => {
46 document = { linkId: 'link-id-123', volumeId: 'volume-id-456' } as NodeMeta
49 documentContentKey: 'key-123' as unknown as SessionKey,
50 userAddressPrivateKey: 'private-key-123' as unknown as PrivateKeyReference,
51 userOwnAddress: 'foo',
56 } as unknown as jest.Mocked<InternalEventBusInterface>
59 execute: jest.fn().mockReturnValue(Result.ok(stringToUtf8Array('123'))),
60 } as unknown as jest.Mocked<EncryptMessage>
66 } as unknown as jest.Mocked<LoggerInterface>
68 service = new WebsocketService(
69 {} as jest.Mocked<GetRealtimeUrlAndToken>,
72 execute: jest.fn().mockReturnValue(Result.ok(stringToUtf8Array('123'))),
73 } as unknown as jest.Mocked<DecryptMessage>,
80 service.createConnection(document, keys, {
81 commitId: () => undefined,
84 record = service.getConnectionRecord('link-id-123')!
86 debouncer = record.debouncer
87 debouncer.setMode(mode)
88 debouncer.markAsReadyToFlush()
90 connection = record.connection
91 connection.broadcastMessage = jest.fn()
92 connection.canBroadcastMessages = jest.fn().mockReturnValue(true)
95 beforeEach(async () => {
96 await createService(DocumentDebounceMode.Realtime)
107 describe('createConnection', () => {
108 it('should post ConnectionEstablishedButNotYetReady when connection is opened', async () => {
109 connection.callbacks.onOpen()
111 expect(eventBus.publish).toHaveBeenCalledWith({
112 type: WebsocketConnectionEvent.ConnectionEstablishedButNotYetReady,
114 document: record.document,
120 describe('sendDocumentUpdateMessage', () => {
121 it('should add to buffer', async () => {
122 debouncer.addUpdates = jest.fn()
124 await service.sendDocumentUpdateMessage(document, new Uint8Array())
126 expect(debouncer.addUpdates).toHaveBeenCalled()
130 describe('handleDocumentUpdateBufferFlush', () => {
131 it('should encrypt updates', async () => {
132 const encryptMock = (service.encryptMessage = jest.fn())
134 await service.handleDocumentUpdateDebouncerFlush(document, new Uint8Array())
136 expect(encryptMock).toHaveBeenCalled()
139 it('should broadcast message', async () => {
140 await service.handleDocumentUpdateDebouncerFlush(document, new Uint8Array())
142 expect(connection.broadcastMessage).toHaveBeenCalled()
145 it('should add message to ack ledger', async () => {
146 service.ledger.messagePosted = jest.fn()
148 await service.handleDocumentUpdateDebouncerFlush(document, new Uint8Array())
150 expect(service.ledger.messagePosted).toHaveBeenCalled()
154 describe('onDocumentConnectionOpened', () => {
155 it('should retry failed messages', async () => {
156 service.retryFailedDocumentUpdatesForDoc = jest.fn()
158 service.onDocumentConnectionReadyToBroadcast(record, mockOnReadyContentPayload)
160 expect(service.retryFailedDocumentUpdatesForDoc).toHaveBeenCalled()
164 describe('onDocumentConnectionReadyToBroadcast', () => {
165 it('should mark connection as ready to broadcast', async () => {
166 connection.markAsReadyToAcceptMessages = jest.fn()
168 service.onDocumentConnectionReadyToBroadcast(record, mockOnReadyContentPayload)
170 expect(connection.markAsReadyToAcceptMessages).toHaveBeenCalled()
173 it('should mark debouncer as ready to flush', () => {
174 debouncer.markAsReadyToFlush = jest.fn()
176 service.onDocumentConnectionReadyToBroadcast(record, mockOnReadyContentPayload)
178 expect(debouncer.markAsReadyToFlush).toHaveBeenCalled()
181 it('should retry failed document updates', () => {
182 service.retryFailedDocumentUpdatesForDoc = jest.fn()
184 service.onDocumentConnectionReadyToBroadcast(record, mockOnReadyContentPayload)
186 expect(service.retryFailedDocumentUpdatesForDoc).toHaveBeenCalled()
189 it('should pass readiness information to eventBus', () => {
190 service.onDocumentConnectionReadyToBroadcast(record, mockOnReadyContentPayload)
191 expect(eventBus.publish).toHaveBeenCalledWith({
192 type: WebsocketConnectionEvent.ConnectedAndReady,
194 document: record.document,
195 readinessInformation: {
196 connectionId: '12345678',
197 clientUpgradeRecommended: true,
198 clientUpgradeRequired: true,
204 it('should log error and call eventBus if content is not parsable', () => {
205 service.onDocumentConnectionReadyToBroadcast(record, new TextEncoder().encode('not parsable'))
206 expect(logger.error).toHaveBeenCalledWith('Unable to parse content from ConnectionReady message')
207 expect(eventBus.publish).toHaveBeenCalledWith({
208 type: WebsocketConnectionEvent.ConnectedAndReady,
210 document: record.document,
211 readinessInformation: undefined,
217 describe('retryAllFailedDocumentUpdates', () => {
218 it('should get ledger unacknowledged updates', async () => {
219 service.ledger.getUnacknowledgedUpdates = jest.fn().mockReturnValue([])
221 service.retryFailedDocumentUpdatesForDoc(document)
223 expect(service.ledger.getUnacknowledgedUpdates).toHaveBeenCalled()
227 describe('handleWindowUnload', () => {
228 it('should not prevent leaving if no unsaved changes', async () => {
229 const event = { preventDefault: jest.fn() } as unknown as BeforeUnloadEvent
231 service.handleWindowUnload(event)
233 expect(event.preventDefault).not.toHaveBeenCalled()
236 it('should prevent leaving if unsaved changes', async () => {
237 debouncer.addUpdates([new DecryptedValue(new Uint8Array())])
239 const event = { preventDefault: jest.fn() } as unknown as BeforeUnloadEvent
241 service.handleWindowUnload(event)
243 expect(event.preventDefault).toHaveBeenCalled()
246 it('should immediately flush a buffer that has pending changes', async () => {
247 debouncer.flush = jest.fn()
249 debouncer.addUpdates([new DecryptedValue(new Uint8Array())])
251 const event = { preventDefault: jest.fn() } as unknown as BeforeUnloadEvent
253 service.handleWindowUnload(event)
255 expect(debouncer.flush).toHaveBeenCalled()
258 it('should prevent leaving if unacked changes', async () => {
259 const event = { preventDefault: jest.fn() } as unknown as BeforeUnloadEvent
261 service.ledger.hasConcerningMessages = jest.fn().mockReturnValue(true)
263 service.handleWindowUnload(event)
265 expect(event.preventDefault).toHaveBeenCalled()
269 describe('flushPendingUpdates', () => {
270 it('should immediately flush a buffer that has pending changes', async () => {
271 debouncer.flush = jest.fn()
273 debouncer.addUpdates([new DecryptedValue(new Uint8Array())])
275 service.flushPendingUpdates()
277 expect(debouncer.flush).toHaveBeenCalled()
281 describe('sendEventMessage', () => {
282 it('should encrypt event message', async () => {
283 const encryptMock = (service.encryptMessage = jest.fn().mockReturnValue(stringToUtf8Array('123')))
285 await service.sendEventMessage(
287 stringToUtf8Array('123'),
288 EventTypeEnum.ClientHasSentACommentMessage,
289 BroadcastSource.AwarenessUpdateHandler,
292 expect(encryptMock).toHaveBeenCalled()
295 it('should ignore sending ClientIsBroadcastingItsPresenceState event if not in realtime mode', async () => {
296 debouncer.getMode = jest.fn().mockReturnValue(DocumentDebounceMode.SinglePlayer)
298 await service.sendEventMessage(
300 stringToUtf8Array('123'),
301 EventTypeEnum.ClientIsBroadcastingItsPresenceState,
302 BroadcastSource.AwarenessUpdateHandler,
305 expect(connection.broadcastMessage).not.toHaveBeenCalled()
308 it('should ignore sending ClientHasSentACommentMessage event if not in realtime mode', async () => {
309 debouncer.getMode = jest.fn().mockReturnValue(DocumentDebounceMode.SinglePlayer)
311 await service.sendEventMessage(
313 stringToUtf8Array('123'),
314 EventTypeEnum.ClientHasSentACommentMessage,
315 BroadcastSource.AwarenessUpdateHandler,
318 expect(connection.broadcastMessage).not.toHaveBeenCalled()
321 it('should send ClientIsBroadcastingItsPresenceState event if in realtime mode', async () => {
322 Object.defineProperty(debouncer, 'isBufferEnabled', { value: false })
324 await service.sendEventMessage(
326 stringToUtf8Array('123'),
327 EventTypeEnum.ClientIsBroadcastingItsPresenceState,
328 BroadcastSource.AwarenessUpdateHandler,
331 expect(connection.broadcastMessage).toHaveBeenCalled()
334 it('should not broadcast if connection cannot send messages', async () => {
335 connection.canBroadcastMessages = jest.fn().mockReturnValue(false)
337 await service.sendEventMessage(
339 stringToUtf8Array('123'),
340 EventTypeEnum.ClientIsBroadcastingItsPresenceState,
341 BroadcastSource.AwarenessUpdateHandler,
344 expect(connection.broadcastMessage).not.toHaveBeenCalled()
348 describe('handleIncomingDocumentUpdatesMessage', () => {
349 it('should put us into realtime mode if message is not ours', async () => {
350 const switchToRealtimeMode = (service.switchToRealtimeMode = jest.fn())
352 await service.handleIncomingDocumentUpdatesMessage(record, {
356 authorAddress: 'bar',
360 } as unknown as ServerMessageWithDocumentUpdates)
362 expect(switchToRealtimeMode).toHaveBeenCalled()
365 it('should not put us into realtime mode if message is ours', async () => {
366 const switchToRealtimeMode = (service.switchToRealtimeMode = jest.fn())
368 await service.handleIncomingDocumentUpdatesMessage(record, {
372 authorAddress: 'foo',
376 } as unknown as ServerMessageWithDocumentUpdates)
378 expect(switchToRealtimeMode).not.toHaveBeenCalled()
382 describe('handleIncomingEventsMessage', () => {
383 it('should switch to realtime mode if event includes ClientIsRequestingOtherClientsToBroadcastTheirState', async () => {
384 const switchToRealtimeMode = (service.switchToRealtimeMode = jest.fn())
387 events: [{ type: EventTypeEnum.ClientIsRequestingOtherClientsToBroadcastTheirState }],
388 } as unknown as ServerMessageWithEvents
390 await service.handleIncomingEventsMessage(record, events)
392 expect(switchToRealtimeMode).toHaveBeenCalled()
395 it('should switch to realtime mode if event includes ClientIsBroadcastingItsPresenceState', async () => {
396 const switchToRealtimeMode = (service.switchToRealtimeMode = jest.fn())
399 events: [{ type: EventTypeEnum.ClientIsBroadcastingItsPresenceState }],
400 } as unknown as ServerMessageWithEvents
402 await service.handleIncomingEventsMessage(record, events)
404 expect(switchToRealtimeMode).toHaveBeenCalled()
407 it('should not switch to realtime mode for all other event types', async () => {
408 const switchToRealtimeMode = (service.switchToRealtimeMode = jest.fn())
412 { type: EventTypeEnum.ServerIsRequestingClientToBroadcastItsState },
413 { type: EventTypeEnum.ServerHasMoreOrLessGivenTheClientEverythingItHas },
414 { type: EventTypeEnum.ServerIsPlacingEmptyActivityIndicatorInStreamToIndicateTheStreamIsStillActive },
415 { type: EventTypeEnum.ClientIsDebugRequestingServerToPerformCommit },
416 { type: EventTypeEnum.ClientHasSentACommentMessage },
417 { type: EventTypeEnum.ServerIsInformingClientThatTheDocumentCommitHasBeenUpdated },
419 } as unknown as ServerMessageWithEvents
421 await service.handleIncomingEventsMessage(record, events)
423 expect(switchToRealtimeMode).not.toHaveBeenCalled()
426 it('should markAsReadyToAcceptMessages on ServerIsReadyToAcceptClientMessages', async () => {
428 events: [{ type: EventTypeEnum.ServerIsReadyToAcceptClientMessages }],
429 } as unknown as ServerMessageWithEvents
431 connection.markAsReadyToAcceptMessages = jest.fn()
433 await service.handleIncomingEventsMessage(record, events)
435 expect(connection.markAsReadyToAcceptMessages).toHaveBeenCalled()
439 describe('handleLedgerStatusChangeCallback', () => {
440 it('should post AckStatusChange event', () => {
441 service.handleLedgerStatusChangeCallback()
443 expect(eventBus.publish).toHaveBeenCalledWith({
444 type: WebsocketConnectionEvent.AckStatusChange,
445 payload: expect.anything(),
450 describe('encryptMessage', () => {
451 it('should publish encryption error event if failed to encrypt', async () => {
452 encryptMessage.execute = jest.fn().mockReturnValue(Result.fail('error'))
454 const spy = (eventBus.publish = jest.fn())
457 await service.encryptMessage(
458 stringToUtf8Array('123'),
459 {} as EncryptionMetadata,
462 BroadcastSource.AwarenessUpdateHandler,
466 expect(spy).toHaveBeenCalledWith({
467 type: WebsocketConnectionEvent.EncryptionError,
470 error: expect.any(String),