1 import type { LoggerInterface } from '@proton/utils/logs'
2 import type { ClientMessageWithDocumentUpdates, DocumentUpdate, ServerMessageWithMessageAcks } from '@proton/docs-proto'
3 import type { AckLedgerInterface } from './AckLedgerInterface'
4 import metrics from '@proton/metrics'
6 const MAX_TIME_TO_WAIT_FOR_ACK = 10_000
8 const HOW_OFTEN_TO_CHECK_FOR_UNACKED_MESSAGES = 1_000
10 type UnconfirmedLedgerEntry = {
11 message: DocumentUpdate
16 * Track which updates have received acknowledgement from the server.
18 export class AckLedger implements AckLedgerInterface {
19 readonly unconfirmedMessages: Map<string, UnconfirmedLedgerEntry> = new Map()
20 concerningMessages: Set<string> = new Set()
21 erroredMessages: Set<string> = new Set()
22 private avgTimeToReceiveAck = 0.0
23 private checkerTimer: ReturnType<typeof setInterval> | undefined
26 private logger: LoggerInterface,
27 private readonly statusChangeCallback: () => void,
29 this.beginCheckingForUnackedMessages()
33 clearInterval(this.checkerTimer)
36 hasConcerningMessages(): boolean {
37 return this.concerningMessages.size > 0
40 hasErroredMessages(): boolean {
41 return this.erroredMessages.size > 0
44 getUnacknowledgedUpdates(): DocumentUpdate[] {
45 return Array.from(this.unconfirmedMessages.values()).map((entry) => entry.message)
48 beginCheckingForUnackedMessages(): void {
49 this.checkerTimer = setInterval(() => {
50 this.checkForUnackedMessages()
51 }, HOW_OFTEN_TO_CHECK_FOR_UNACKED_MESSAGES)
54 checkForUnackedMessages(): void {
55 const now = new Date()
56 const concerningMessages: UnconfirmedLedgerEntry[] = []
57 const erroredMessages: UnconfirmedLedgerEntry[] = []
59 const entries = this.unconfirmedMessages.values()
61 for (const entry of entries) {
62 const timeSincePosted = now.getTime() - entry.postedAt.getTime()
63 if (timeSincePosted > this.thresholdForError()) {
64 erroredMessages.push(entry)
65 } else if (timeSincePosted > this.thresholdForConcern()) {
66 concerningMessages.push(entry)
70 if (concerningMessages.length > 0 || erroredMessages.length > 0) {
71 this.notifyOfUnackedMessages(concerningMessages, erroredMessages)
75 notifyOfUnackedMessages(
76 concerningMessages: UnconfirmedLedgerEntry[],
77 erroredMessages: UnconfirmedLedgerEntry[],
79 const newlyConcerningMessages = concerningMessages.filter(
80 (entry) => !this.concerningMessages.has(entry.message.uuid),
82 const newlyErroredMessages = erroredMessages.filter((entry) => !this.erroredMessages.has(entry.message.uuid))
84 if (newlyConcerningMessages.length) {
85 metrics.docs_document_updates_ack_error_total.increment(
87 type: 'concern_threshold',
89 newlyConcerningMessages.length,
93 if (newlyErroredMessages.length) {
94 metrics.docs_document_updates_ack_error_total.increment(
96 type: 'error_threshold',
98 newlyErroredMessages.length,
102 if (newlyConcerningMessages.length > 0 || newlyErroredMessages.length > 0) {
103 this.concerningMessages = new Set([
104 ...this.concerningMessages,
105 ...newlyConcerningMessages.map((entry) => entry.message.uuid),
107 this.erroredMessages = new Set([
108 ...this.erroredMessages,
109 ...newlyErroredMessages.map((entry) => entry.message.uuid),
112 this.notifyOfStatusChange()
116 notifyOfStatusChange(): void {
117 this.statusChangeCallback()
120 thresholdForConcern(): number {
121 return this.avgTimeToReceiveAck + 2_500
124 thresholdForError(): number {
125 return MAX_TIME_TO_WAIT_FOR_ACK
128 messagePosted(message: ClientMessageWithDocumentUpdates): void {
129 for (const update of message.updates.documentUpdates) {
130 this.unconfirmedMessages.set(update.uuid, {
132 postedAt: new Date(),
137 messageAcknowledgementReceived(message: ServerMessageWithMessageAcks): void {
139 'Received ack message',
140 message.acks.map((ack) => ack.uuid),
143 for (const ack of message.acks) {
144 const update = this.unconfirmedMessages.get(ack.uuid)
147 const timeToReceiveAck = new Date().getTime() - update.postedAt.getTime()
149 this.avgTimeToReceiveAck = (this.avgTimeToReceiveAck + timeToReceiveAck) / 2
151 this.unconfirmedMessages.delete(ack.uuid)
153 if (this.concerningMessages.has(ack.uuid) || this.erroredMessages.has(ack.uuid)) {
154 this.concerningMessages.delete(ack.uuid)
155 this.erroredMessages.delete(ack.uuid)
156 this.notifyOfStatusChange()
160 `Received ack for message ${ack.uuid} in ${timeToReceiveAck}ms avg time to ack: ${this.avgTimeToReceiveAck}ms`,
163 metrics.docs_realtime_edit_time_to_ack_histogram.observe({
165 Value: timeToReceiveAck,