Merge branch '3.0' of https://github.com/calzoneman/sync into 3.0
[KisSync.git] / src / channel-storage / dbstore.js
blob52c395636d7ffc3d38ae076002ef00ef0688436c
1 import Promise from 'bluebird';
2 import { ChannelStateSizeError } from '../errors';
3 import db from '../database';
4 import { Counter } from 'prom-client';
6 const LOGGER = require('@calzoneman/jsli')('dbstore');
7 const SIZE_LIMIT = 1048576;
8 const QUERY_CHANNEL_DATA = 'SELECT `key`, `value` FROM channel_data WHERE channel_id = ?';
9 const loadRowcount = new Counter({
10 name: 'cytube_channel_db_load_rows_total',
11 help: 'Total rows loaded from the channel_data table'
12 });
13 const loadCharcount = new Counter({
14 name: 'cytube_channel_db_load_chars_total',
15 help: 'Total characters (JSON length) loaded from the channel_data table'
16 });
17 const saveRowcount = new Counter({
18 name: 'cytube_channel_db_save_rows_total',
19 help: 'Total rows saved in the channel_data table'
20 });
21 const saveCharcount = new Counter({
22 name: 'cytube_channel_db_save_chars_total',
23 help: 'Total characters (JSON length) saved in the channel_data table'
24 });
26 function queryAsync(query, substitutions) {
27 return new Promise((resolve, reject) => {
28 db.query(query, substitutions, (err, res) => {
29 if (err) {
30 if (!(err instanceof Error)) {
31 err = new Error(err);
33 reject(err);
34 } else {
35 resolve(res);
37 });
38 });
41 function buildUpdateQuery(numEntries) {
42 const values = [];
43 for (let i = 0; i < numEntries; i++) {
44 values.push('(?, ?, ?)');
47 return `INSERT INTO channel_data VALUES ${values.join(', ')} ` +
48 'ON DUPLICATE KEY UPDATE `value` = VALUES(`value`)';
51 export class DatabaseStore {
52 load(id, channelName) {
53 if (!id || id === 0) {
54 return Promise.reject(new Error(`Cannot load state for [${channelName}]: ` +
55 `id was passed as [${id}]`));
58 return queryAsync(QUERY_CHANNEL_DATA, [id]).then(rows => {
59 loadRowcount.inc(rows.length);
61 const data = {};
62 rows.forEach(row => {
63 try {
64 data[row.key] = JSON.parse(row.value);
65 loadCharcount.inc(row.value.length);
66 } catch (e) {
67 LOGGER.error(`Channel data for channel "${channelName}", ` +
68 `key "${row.key}" is invalid: ${e}`);
70 });
72 return data;
73 });
76 async save(id, channelName, data) {
77 if (!id || id === 0) {
78 throw new Error(
79 `Cannot save state for [${channelName}]: ` +
80 `id was passed as [${id}]`
84 let totalSize = 0;
85 let rowCount = 0;
86 const substitutions = [];
88 for (const key in data) {
89 if (typeof data[key] === 'undefined') {
90 continue;
93 rowCount++;
95 const value = JSON.stringify(data[key]);
96 totalSize += value.length;
98 substitutions.push(id);
99 substitutions.push(key);
100 substitutions.push(value);
103 if (rowCount === 0) {
104 return;
107 if (totalSize > SIZE_LIMIT) {
108 throw new ChannelStateSizeError(
109 'Channel state size is too large',
111 limit: SIZE_LIMIT,
112 actual: totalSize
117 saveRowcount.inc(rowCount);
118 saveCharcount.inc(totalSize);
120 return await queryAsync(buildUpdateQuery(rowCount), substitutions);