Merge branch 'feat/inda-383-daily-stat' into 'main'
[ProtonMail-WebClient.git] / packages / docs-core / lib / Services / Websockets / AckLedger / AckLedger.ts
blob772c9d7bcdaf9139fd23c2435c073ffdb76bf06a
1 import type { LoggerInterface } from '@proton/utils/logs'
2 import type { ClientMessageWithDocumentUpdates, DocumentUpdate, ServerMessageWithMessageAcks } from '@proton/docs-proto'
3 import type { AckLedgerInterface } from './AckLedgerInterface'
4 import metrics from '@proton/metrics'
6 const MAX_TIME_TO_WAIT_FOR_ACK = 10_000
8 const HOW_OFTEN_TO_CHECK_FOR_UNACKED_MESSAGES = 1_000
10 type UnconfirmedLedgerEntry = {
11   message: DocumentUpdate
12   postedAt: Date
15 /**
16  * Track which updates have received acknowledgement from the server.
17  */
18 export class AckLedger implements AckLedgerInterface {
19   readonly unconfirmedMessages: Map<string, UnconfirmedLedgerEntry> = new Map()
20   concerningMessages: Set<string> = new Set()
21   erroredMessages: Set<string> = new Set()
22   private avgTimeToReceiveAck = 0.0
23   private checkerTimer: ReturnType<typeof setInterval> | undefined
25   constructor(
26     private logger: LoggerInterface,
27     private readonly statusChangeCallback: () => void,
28   ) {
29     this.beginCheckingForUnackedMessages()
30   }
32   destroy(): void {
33     clearInterval(this.checkerTimer)
34   }
36   hasConcerningMessages(): boolean {
37     return this.concerningMessages.size > 0
38   }
40   hasErroredMessages(): boolean {
41     return this.erroredMessages.size > 0
42   }
44   getUnacknowledgedUpdates(): DocumentUpdate[] {
45     return Array.from(this.unconfirmedMessages.values()).map((entry) => entry.message)
46   }
48   beginCheckingForUnackedMessages(): void {
49     this.checkerTimer = setInterval(() => {
50       this.checkForUnackedMessages()
51     }, HOW_OFTEN_TO_CHECK_FOR_UNACKED_MESSAGES)
52   }
54   checkForUnackedMessages(): void {
55     const now = new Date()
56     const concerningMessages: UnconfirmedLedgerEntry[] = []
57     const erroredMessages: UnconfirmedLedgerEntry[] = []
59     const entries = this.unconfirmedMessages.values()
61     for (const entry of entries) {
62       const timeSincePosted = now.getTime() - entry.postedAt.getTime()
63       if (timeSincePosted > this.thresholdForError()) {
64         erroredMessages.push(entry)
65       } else if (timeSincePosted > this.thresholdForConcern()) {
66         concerningMessages.push(entry)
67       }
68     }
70     if (concerningMessages.length > 0 || erroredMessages.length > 0) {
71       this.notifyOfUnackedMessages(concerningMessages, erroredMessages)
72     }
73   }
75   notifyOfUnackedMessages(
76     concerningMessages: UnconfirmedLedgerEntry[],
77     erroredMessages: UnconfirmedLedgerEntry[],
78   ): void {
79     const newlyConcerningMessages = concerningMessages.filter(
80       (entry) => !this.concerningMessages.has(entry.message.uuid),
81     )
82     const newlyErroredMessages = erroredMessages.filter((entry) => !this.erroredMessages.has(entry.message.uuid))
84     if (newlyConcerningMessages.length) {
85       metrics.docs_document_updates_ack_error_total.increment(
86         {
87           type: 'concern_threshold',
88         },
89         newlyConcerningMessages.length,
90       )
91     }
93     if (newlyErroredMessages.length) {
94       metrics.docs_document_updates_ack_error_total.increment(
95         {
96           type: 'error_threshold',
97         },
98         newlyErroredMessages.length,
99       )
100     }
102     if (newlyConcerningMessages.length > 0 || newlyErroredMessages.length > 0) {
103       this.concerningMessages = new Set([
104         ...this.concerningMessages,
105         ...newlyConcerningMessages.map((entry) => entry.message.uuid),
106       ])
107       this.erroredMessages = new Set([
108         ...this.erroredMessages,
109         ...newlyErroredMessages.map((entry) => entry.message.uuid),
110       ])
112       this.notifyOfStatusChange()
113     }
114   }
116   notifyOfStatusChange(): void {
117     this.statusChangeCallback()
118   }
120   thresholdForConcern(): number {
121     return this.avgTimeToReceiveAck + 2_500
122   }
124   thresholdForError(): number {
125     return MAX_TIME_TO_WAIT_FOR_ACK
126   }
128   messagePosted(message: ClientMessageWithDocumentUpdates): void {
129     for (const update of message.updates.documentUpdates) {
130       this.unconfirmedMessages.set(update.uuid, {
131         message: update,
132         postedAt: new Date(),
133       })
134     }
135   }
137   messageAcknowledgementReceived(message: ServerMessageWithMessageAcks): void {
138     this.logger.info(
139       'Received ack message',
140       message.acks.map((ack) => ack.uuid),
141     )
143     for (const ack of message.acks) {
144       const update = this.unconfirmedMessages.get(ack.uuid)
146       if (update) {
147         const timeToReceiveAck = new Date().getTime() - update.postedAt.getTime()
149         this.avgTimeToReceiveAck = (this.avgTimeToReceiveAck + timeToReceiveAck) / 2
151         this.unconfirmedMessages.delete(ack.uuid)
153         if (this.concerningMessages.has(ack.uuid) || this.erroredMessages.has(ack.uuid)) {
154           this.concerningMessages.delete(ack.uuid)
155           this.erroredMessages.delete(ack.uuid)
156           this.notifyOfStatusChange()
157         }
159         this.logger.info(
160           `Received ack for message ${ack.uuid} in ${timeToReceiveAck}ms avg time to ack: ${this.avgTimeToReceiveAck}ms`,
161         )
163         metrics.docs_realtime_edit_time_to_ack_histogram.observe({
164           Labels: {},
165           Value: timeToReceiveAck,
166         })
167       }
168     }
169   }