2 -- luacheck: ignore 212/self
4 local cache
= require
"util.cache";
5 local json
= require
"util.json";
6 local sql
= require
"util.sql";
7 local xml_parse
= require
"util.xml".parse
;
8 local uuid
= require
"util.uuid";
9 local resolve_relative_path
= require
"util.paths".resolve_relative_path
;
10 local jid_join
= require
"util.jid".join
;
12 local is_stanza
= require
"util.stanza".is_stanza
;
13 local t_concat
= table.concat
;
15 local noop
= function() end
16 local unpack
= table.unpack
or unpack
; -- luacheck: ignore 113
17 local function iterator(result
)
18 return function(result_
)
19 local row
= result_();
26 local default_params
= { driver
= "SQLite3" };
30 local function serialize(value
)
31 local t
= type(value
);
32 if t
== "string" or t
== "boolean" or t
== "number" then
33 return t
, tostring(value
);
34 elseif is_stanza(value
) then
35 return "xml", tostring(value
);
36 elseif t
== "table" then
37 local encoded
,err
= json
.encode(value
);
38 if encoded
then return "json", encoded
; end
41 return nil, "Unhandled value type: "..t
;
43 local function deserialize(t
, value
)
44 if t
== "string" then return value
;
45 elseif t
== "boolean" then
46 if value
== "true" then return true;
47 elseif value
== "false" then return false; end
48 return nil, "invalid-boolean";
49 elseif t
== "number" then
50 value
= tonumber(value
);
51 if value
then return value
; end
52 return nil, "invalid-number";
53 elseif t
== "json" then
54 return json
.decode(value
);
55 elseif t
== "xml" then
56 return xml_parse(value
);
58 return nil, "Unhandled value type: "..t
;
61 local host
= module
.host
;
64 local function keyval_store_get()
68 SELECT "key","type","value"
70 WHERE "host"=? AND "user"=? AND "store"=?;
72 for row
in engine
:select(select_sql
, host
, user
or "", store
) do
75 local v
, e
= deserialize(row
[2], row
[3]);
78 if k
~= "" then result
[k
] = v
; elseif type(v
) == "table" then
79 for a
,b
in pairs(v
) do
89 local function keyval_store_set(data
)
92 WHERE "host"=? AND "user"=? AND "store"=?
94 engine
:delete(delete_sql
, host
, user
or "", store
);
98 ("host","user","store","key","type","value")
101 if data
and next(data
) ~= nil then
102 local extradata
= {};
103 for key
, value
in pairs(data
) do
104 if type(key
) == "string" and key
~= "" then
105 local t
, encoded_value
= assert(serialize(value
));
106 engine
:insert(insert_sql
, host
, user
or "", store
, key
, t
, encoded_value
);
108 extradata
[key
] = value
;
111 if next(extradata
) ~= nil then
112 local t
, encoded_extradata
= assert(serialize(extradata
));
113 engine
:insert(insert_sql
, host
, user
or "", store
, "", t
, encoded_extradata
);
119 --- Key/value store API (default store type)
121 local keyval_store
= {};
122 keyval_store
.__index
= keyval_store
;
123 function keyval_store
:get(username
)
124 user
, store
= username
, self
.store
;
125 local ok
, result
= engine
:transaction(keyval_store_get
);
127 module
:log("error", "Unable to read from database %s store for %s: %s", store
, username
or "<host>", result
);
132 function keyval_store
:set(username
, data
)
133 user
,store
= username
,self
.store
;
134 return engine
:transaction(function()
135 return keyval_store_set(data
);
138 function keyval_store
:users()
139 local ok
, result
= engine
:transaction(function()
140 local select_sql
= [[
141 SELECT DISTINCT "user"
143 WHERE "host"=? AND "store"=?;
145 return engine
:select(select_sql
, host
, self
.store
);
147 if not ok
then error(result
); end
148 return iterator(result
);
151 --- Archive store API
153 local archive_item_limit
= module
:get_option_number("storage_archive_item_limit");
154 local archive_item_count_cache
= cache
.new(module
:get_option("storage_archive_item_limit_cache_size", 1000));
156 -- luacheck: ignore 512 431/user 431/store 431/err
157 local map_store
= {};
158 map_store
.__index
= map_store
;
159 map_store
.remove = {};
160 function map_store
:get(username
, key
)
161 local ok
, result
= engine
:transaction(function()
163 SELECT "type", "value"
165 WHERE "host"=? AND "user"=? AND "store"=? AND "key"=?
169 if type(key
) == "string" and key
~= "" then
170 for row
in engine
:select(query
, host
, username
or "", self
.store
, key
) do
171 data
, err
= deserialize(row
[1], row
[2]);
172 assert(data
~= nil, err
);
176 for row
in engine
:select(query
, host
, username
or "", self
.store
, "") do
177 data
, err
= deserialize(row
[1], row
[2]);
178 assert(data
~= nil, err
);
180 return data
and data
[key
] or nil;
183 if not ok
then return nil, result
; end
186 function map_store
:set(username
, key
, data
)
187 if data
== nil then data
= self
.remove; end
188 return self
:set_keys(username
, { [key
] = data
});
190 function map_store
:set_keys(username
, keydatas
)
191 local ok
, result
= engine
:transaction(function()
192 local delete_sql
= [[
193 DELETE FROM "prosody"
194 WHERE "host"=? AND "user"=? AND "store"=? AND "key"=?;
196 local insert_sql
= [[
197 INSERT INTO "prosody"
198 ("host","user","store","key","type","value")
199 VALUES (?,?,?,?,?,?);
201 local select_extradata_sql
= [[
202 SELECT "type", "value"
204 WHERE "host"=? AND "user"=? AND "store"=? AND "key"=?
207 for key
, data
in pairs(keydatas
) do
208 if type(key
) == "string" and key
~= "" then
209 engine
:delete(delete_sql
,
210 host
, username
or "", self
.store
, key
);
211 if data
~= self
.remove then
212 local t
, value
= assert(serialize(data
));
213 engine
:insert(insert_sql
, host
, username
or "", self
.store
, key
, t
, value
);
216 local extradata
, err
= {};
217 for row
in engine
:select(select_extradata_sql
, host
, username
or "", self
.store
, "") do
218 extradata
, err
= deserialize(row
[1], row
[2]);
219 assert(extradata
~= nil, err
);
221 engine
:delete(delete_sql
, host
, username
or "", self
.store
, "");
222 extradata
[key
] = data
;
223 local t
, value
= assert(serialize(extradata
));
224 engine
:insert(insert_sql
, host
, username
or "", self
.store
, "", t
, value
);
229 if not ok
then return nil, result
; end
233 local archive_store
= {}
234 archive_store
.caps
= {
236 quota
= archive_item_limit
;
239 archive_store
.__index
= archive_store
240 function archive_store
:append(username
, key
, value
, when
, with
)
241 local user
,store
= username
,self
.store
;
242 local cache_key
= jid_join(username
, host
, store
);
243 local item_count
= archive_item_count_cache
:get(cache_key
);
244 if not item_count
then
245 local ok
, ret
= engine
:transaction(function()
247 SELECT COUNT(*) FROM "prosodyarchive"
248 WHERE "host"=? AND "user"=? AND "store"=?;
250 local result
= engine
:select(count_sql
, host
, user
, store
);
257 if not ok
or not item_count
then
258 module
:log("error", "Failed while checking quota for %s: %s", username
, ret
);
259 return nil, "Failure while checking quota";
261 archive_item_count_cache
:set(cache_key
, item_count
);
264 if archive_item_limit
then
265 module
:log("debug", "%s has %d items out of %d limit", username
, item_count
, archive_item_limit
);
266 if item_count
>= archive_item_limit
then
267 return nil, "quota-limit";
271 when
= when
or os
.time();
273 local ok
, ret
= engine
:transaction(function()
274 local delete_sql
= [[
275 DELETE FROM "prosodyarchive"
276 WHERE "host"=? AND "user"=? AND "store"=? AND "key"=?;
278 local insert_sql
= [[
279 INSERT INTO "prosodyarchive"
280 ("host", "user", "store", "when", "with", "key", "type", "value")
281 VALUES (?,?,?,?,?,?,?,?);
284 local result
, err
= engine
:delete(delete_sql
, host
, user
or "", store
, key
);
286 item_count
= item_count
- result
:affected();
289 key
= uuid
.generate();
291 local t
, encoded_value
= assert(serialize(value
));
292 engine
:insert(insert_sql
, host
, user
or "", store
, when
, with
, key
, t
, encoded_value
);
293 archive_item_count_cache
:set(cache_key
, item_count
+1);
296 if not ok
then return ok
, ret
; end
297 return ret
; -- the key
300 -- Helpers for building the WHERE clause
301 local function archive_where(query
, args
, where
)
302 -- Time range, inclusive
304 args
[#args
+1] = query
.start
305 where
[#where
+1] = "\"when\" >= ?"
309 args
[#args
+1] = query
["end"];
311 where
[#where
] = "\"when\" BETWEEN ? AND ?" -- is this inclusive?
313 where
[#where
+1] = "\"when\" <= ?"
319 where
[#where
+1] = "\"with\" = ?";
320 args
[#args
+1] = query
.with
325 where
[#where
+1] = "\"key\" = ?";
326 args
[#args
+1] = query
.key
329 local function archive_where_id_range(query
, args
, where
)
330 -- Before or after specific item, exclusive
331 local id_lookup_sql
= [[
333 FROM "prosodyarchive"
334 WHERE "key" = ? AND "host" = ? AND "user" = ? AND "store" = ?
337 if query
.after
then -- keys better be unique!
338 local after_id
= nil;
339 for row
in engine
:select(id_lookup_sql
, query
.after
, args
[1], args
[2], args
[3]) do
343 return nil, "item-not-found";
345 where
[#where
+1] = '"sort_id" > ?';
346 args
[#args
+1] = after_id
;
349 local before_id
= nil;
350 for row
in engine
:select(id_lookup_sql
, query
.after
, args
[1], args
[2], args
[3]) do
353 if not before_id
then
354 return nil, "item-not-found";
356 where
[#where
+1] = '"sort_id" < ?';
357 args
[#args
+1] = before_id
;
362 function archive_store
:find(username
, query
)
364 local user
,store
= username
,self
.store
;
365 local cache_key
= jid_join(username
, host
, self
.store
);
366 local total
= archive_item_count_cache
:get(cache_key
);
367 if total
~= nil and query
.limit
== 0 and query
.start
== nil and query
.with
== nil and query
["end"] == nil and query
.key
== nil then
370 local ok
, result
, err
= engine
:transaction(function()
372 SELECT "key", "type", "value", "when", "with"
373 FROM "prosodyarchive"
375 ORDER BY "sort_id" %s%s;
377 local args
= { host
, user
or "", store
, };
378 local where
= { "\"host\" = ?", "\"user\" = ?", "\"store\" = ?", };
380 archive_where(query
, args
, where
);
384 local stats
= engine
:select("SELECT COUNT(*) FROM \"prosodyarchive\" WHERE "
385 .. t_concat(where
, " AND "), unpack(args
));
391 if query
.start
== nil and query
.with
== nil and query
["end"] == nil and query
.key
== nil then
392 archive_item_count_cache
:set(cache_key
, total
);
394 if query
.limit
== 0 then -- Skip the real query
399 local ok
, err
= archive_where_id_range(query
, args
, where
);
400 if not ok
then return ok
, err
; end
403 args
[#args
+1] = query
.limit
;
406 sql_query
= sql_query
:format(t_concat(where
, " AND "), query
.reverse
407 and "DESC" or "ASC", query
.limit
and " LIMIT ?" or "");
408 return engine
:select(sql_query
, unpack(args
));
410 if not ok
then return ok
, result
; end
411 if not result
then return nil, err
; end
413 local row
= result();
415 local value
, err
= deserialize(row
[2], row
[3]);
416 assert(value
~= nil, err
);
417 return row
[1], value
, row
[4], row
[5];
422 function archive_store
:summary(username
, query
)
424 local user
,store
= username
,self
.store
;
425 local ok
, result
= engine
:transaction(function()
427 SELECT DISTINCT "with", COUNT(*), MIN("when"), MAX("when")
428 FROM "prosodyarchive"
431 ORDER BY "sort_id" %s%s;
433 local args
= { host
, user
or "", store
, };
434 local where
= { "\"host\" = ?", "\"user\" = ?", "\"store\" = ?", };
436 archive_where(query
, args
, where
);
438 archive_where_id_range(query
, args
, where
);
441 args
[#args
+1] = query
.limit
;
444 sql_query
= sql_query
:format(t_concat(where
, " AND "), query
.reverse
445 and "DESC" or "ASC", query
.limit
and " LIMIT ?" or "");
446 return engine
:select(sql_query
, unpack(args
));
448 if not ok
then return ok
, result
end
450 local earliest
, latest
= {}, {};
452 local with
, count
= row
[1], row
[2];
453 counts
[with
] = count
;
454 earliest
[with
] = row
[3];
455 latest
[with
] = row
[4];
464 function archive_store
:delete(username
, query
)
466 local user
,store
= username
,self
.store
;
467 local ok
, stmt
= engine
:transaction(function()
468 local sql_query
= "DELETE FROM \"prosodyarchive\" WHERE %s;";
469 local args
= { host
, user
or "", store
, };
470 local where
= { "\"host\" = ?", "\"user\" = ?", "\"store\" = ?", };
472 table.remove(args
, 2);
473 table.remove(where
, 2);
475 archive_where(query
, args
, where
);
476 local ok
, err
= archive_where_id_range(query
, args
, where
);
477 if not ok
then return ok
, err
; end
478 if query
.truncate
== nil then
479 sql_query
= sql_query
:format(t_concat(where
, " AND "));
481 args
[#args
+1] = query
.truncate
;
482 local unlimited
= "ALL";
484 DELETE FROM "prosodyarchive"
486 SELECT "sort_id" FROM "prosodyarchive"
488 ORDER BY "sort_id" %s
491 if engine
.params
.driver
== "SQLite3" then
492 if engine
._have_delete_limit
then
494 DELETE FROM "prosodyarchive"
496 ORDER BY "sort_id" %s
501 elseif engine
.params
.driver
== "MySQL" then
503 DELETE result FROM prosodyarchive AS result JOIN (
504 SELECT sort_id FROM prosodyarchive
506 ORDER BY "sort_id" %s
508 ) AS limiter on result.sort_id = limiter.sort_id;]];
509 unlimited
= "18446744073709551615";
511 sql_query
= string.format(sql_query
, t_concat(where
, " AND "),
512 query
.reverse
and "ASC" or "DESC", unlimited
);
514 return engine
:delete(sql_query
, unpack(args
));
516 local cache_key
= jid_join(username
, host
, self
.store
);
517 archive_item_count_cache
:set(cache_key
, nil);
518 return ok
and stmt
:affected(), stmt
;
521 function archive_store
:users()
522 local ok
, result
= engine
:transaction(function()
523 local select_sql
= [[
524 SELECT DISTINCT "user"
525 FROM "prosodyarchive"
526 WHERE "host"=? AND "store"=?;
528 return engine
:select(select_sql
, host
, self
.store
);
530 if not ok
then error(result
); end
531 return iterator(result
);
535 keyval
= keyval_store
;
537 archive
= archive_store
;
540 --- Implement storage driver API
542 -- FIXME: Some of these operations need to operate on the archive store(s) too
546 function driver
:open(store
, typ
)
547 local store_mt
= stores
[typ
or "keyval"];
549 return setmetatable({ store
= store
}, store_mt
);
551 return nil, "unsupported-store";
554 function driver
:stores(username
)
555 local query
= "SELECT DISTINCT \"store\" FROM \"prosody\" WHERE \"host\"=? AND \"user\"" ..
556 (username
== true and "!=?" or "=?");
557 if username
== true or not username
then
560 local ok
, result
= engine
:transaction(function()
561 return engine
:select(query
, host
, username
);
563 if not ok
then return ok
, result
end
564 return iterator(result
);
567 function driver
:purge(username
)
568 return engine
:transaction(function()
569 engine
:delete("DELETE FROM \"prosody\" WHERE \"host\"=? AND \"user\"=?", host
, username
);
570 engine
:delete("DELETE FROM \"prosodyarchive\" WHERE \"host\"=? AND \"user\"=?", host
, username
);
577 local function create_table(engine
) -- luacheck: ignore 431/engine
578 local Table
, Column
, Index
= sql
.Table
, sql
.Column
, sql
.Index
;
580 local ProsodyTable
= Table
{
582 Column
{ name
="host", type="TEXT", nullable
=false };
583 Column
{ name
="user", type="TEXT", nullable
=false };
584 Column
{ name
="store", type="TEXT", nullable
=false };
585 Column
{ name
="key", type="TEXT", nullable
=false };
586 Column
{ name
="type", type="TEXT", nullable
=false };
587 Column
{ name
="value", type="MEDIUMTEXT", nullable
=false };
588 Index
{ name
="prosody_index", "host", "user", "store", "key" };
590 engine
:transaction(function()
591 ProsodyTable
:create(engine
);
594 local ProsodyArchiveTable
= Table
{
595 name
="prosodyarchive";
596 Column
{ name
="sort_id", type="INTEGER", primary_key
=true, auto_increment
=true };
597 Column
{ name
="host", type="TEXT", nullable
=false };
598 Column
{ name
="user", type="TEXT", nullable
=false };
599 Column
{ name
="store", type="TEXT", nullable
=false };
600 Column
{ name
="key", type="TEXT", nullable
=false }; -- item id
601 Column
{ name
="when", type="INTEGER", nullable
=false }; -- timestamp
602 Column
{ name
="with", type="TEXT", nullable
=false }; -- related id
603 Column
{ name
="type", type="TEXT", nullable
=false };
604 Column
{ name
="value", type="MEDIUMTEXT", nullable
=false };
605 Index
{ name
="prosodyarchive_index", unique
= engine
.params
.driver
~= "MySQL", "host", "user", "store", "key" };
606 Index
{ name
="prosodyarchive_with_when", "host", "user", "store", "with", "when" };
607 Index
{ name
="prosodyarchive_when", "host", "user", "store", "when" };
609 engine
:transaction(function()
610 ProsodyArchiveTable
:create(engine
);
614 local function upgrade_table(engine
, params
, apply_changes
) -- luacheck: ignore 431/engine
615 local changes
= false;
616 if params
.driver
== "MySQL" then
617 local success
,err
= engine
:transaction(function()
619 local result
= assert(engine
:execute("SHOW COLUMNS FROM \"prosody\" WHERE \"Field\"='value' and \"Type\"='text'"));
620 if result
:rowcount() > 0 then
622 if apply_changes
then
623 module
:log("info", "Upgrading database schema (value column size)...");
624 assert(engine
:execute("ALTER TABLE \"prosody\" MODIFY COLUMN \"value\" MEDIUMTEXT"));
625 module
:log("info", "Database table automatically upgraded");
631 -- Ensure index is not unique (issue #1073)
632 local result
= assert(engine
:execute([[SHOW INDEX FROM prosodyarchive WHERE key_name='prosodyarchive_index' and non_unique=0]]));
633 if result
:rowcount() > 0 then
635 if apply_changes
then
636 module
:log("info", "Upgrading database schema (prosodyarchive_index)...");
637 assert(engine
:execute[[ALTER TABLE "prosodyarchive" DROP INDEX prosodyarchive_index;]]);
638 local new_index
= sql
.Index
{ table = "prosodyarchive", name
="prosodyarchive_index", "host", "user", "store", "key" };
639 assert(engine
:_create_index(new_index
));
640 module
:log("info", "Database table automatically upgraded");
647 module
:log("error", "Failed to check/upgrade database schema (%s), please see "
648 .."https://prosody.im/doc/mysql for help",
649 err
or "unknown error");
653 -- COMPAT w/pre-0.10: Upgrade table to UTF-8 if not already
654 local check_encoding_query
= [[
655 SELECT "COLUMN_NAME","COLUMN_TYPE","TABLE_NAME"
656 FROM "information_schema"."columns"
657 WHERE "TABLE_NAME" LIKE 'prosody%%'
658 AND "TABLE_SCHEMA" = ?
659 AND ( "CHARACTER_SET_NAME"!=? OR "COLLATION_NAME"!=?);
661 -- FIXME Is it ok to ignore the return values from this?
662 engine
:transaction(function()
663 local result
= assert(engine
:execute(check_encoding_query
, params
.database
, engine
.charset
, engine
.charset
.."_bin"));
664 local n_bad_columns
= result
:rowcount();
665 if n_bad_columns
> 0 then
667 if apply_changes
then
668 module
:log("warn", "Found %d columns in prosody table requiring encoding change, updating now...", n_bad_columns
);
669 local fix_column_query1
= "ALTER TABLE \"%s\" CHANGE \"%s\" \"%s\" BLOB;";
670 local fix_column_query2
= "ALTER TABLE \"%s\" CHANGE \"%s\" \"%s\" %s CHARACTER SET '%s' COLLATE '%s_bin';";
671 for row
in result
:rows() do
672 local column_name
, column_type
, table_name
= unpack(row
);
673 module
:log("debug", "Fixing column %s in table %s", column_name
, table_name
);
674 engine
:execute(fix_column_query1
:format(table_name
, column_name
, column_name
));
675 engine
:execute(fix_column_query2
:format(table_name
, column_name
, column_name
, column_type
, engine
.charset
, engine
.charset
));
677 module
:log("info", "Database encoding upgrade complete!");
681 success
,err
= engine
:transaction(function()
682 return engine
:execute(check_encoding_query
, params
.database
,
683 engine
.charset
, engine
.charset
.."_bin");
686 module
:log("error", "Failed to check/upgrade database encoding: %s", err
or "unknown error");
693 local function normalize_database(driver
, database
) -- luacheck: ignore 431/driver
694 if driver
== "SQLite3" and database
~= ":memory:" then
695 return resolve_relative_path(prosody
.paths
.data
or ".", database
or "prosody.sqlite");
700 local function normalize_params(params
)
702 driver
= assert(params
.driver
,
703 "Configuration error: Both the SQL driver and the database need to be specified");
704 database
= assert(normalize_database(params
.driver
, params
.database
),
705 "Configuration error: Both the SQL driver and the database need to be specified");
706 username
= params
.username
;
707 password
= params
.password
;
713 function module
.load()
714 if prosody
.prosodyctl
then return; end
715 local engines
= module
:shared("/*/sql/connections");
716 local params
= normalize_params(module
:get_option("sql", default_params
));
717 engine
= engines
[sql
.db2uri(params
)];
719 module
:log("debug", "Creating new engine");
720 engine
= sql
:create_engine(params
, function (engine
) -- luacheck: ignore 431/engine
721 if module
:get_option("sql_manage_tables", true) then
722 -- Automatically create table, ignore failure (table probably already exists)
723 -- FIXME: we should check in information_schema, etc.
724 create_table(engine
);
725 -- Check whether the table needs upgrading
726 if upgrade_table(engine
, params
, false) then
727 module
:log("error", "Old database format detected. Please run: prosodyctl mod_%s upgrade", module
.name
);
728 return false, "database upgrade needed";
730 if engine
.params
.driver
== "SQLite3" then
731 for row
in engine
:select("PRAGMA compile_options") do
732 if row
[1] == "ENABLE_UPDATE_DELETE_LIMIT" then
733 engine
._have_delete_limit
= true;
739 engines
[sql
.db2uri(params
)] = engine
;
742 module
:provides("storage", driver
);
745 function module
.command(arg
)
746 local config
= require
"core.configmanager";
747 local prosodyctl
= require
"util.prosodyctl";
748 local command
= table.remove(arg
, 1);
749 if command
== "upgrade" then
750 -- We need to find every unique dburi in the config
752 for host
in pairs(prosody
.hosts
) do -- luacheck: ignore 431/host
753 local params
= normalize_params(config
.get(host
, "sql") or default_params
);
754 uris
[sql
.db2uri(params
)] = params
;
756 print("We will check and upgrade the following databases:\n");
757 for _
, params
in pairs(uris
) do
758 print("", "["..params
.driver
.."] "..params
.database
..(params
.host
and " on "..params
.host
or ""));
761 print("Ensure you have working backups of the above databases before continuing! ");
762 if not prosodyctl
.show_yesno("Continue with the database upgrade? [yN]") then
763 print("Ok, no upgrade. But you do have backups, don't you? ...don't you?? :-)");
767 for _
, params
in pairs(uris
) do
768 print("Checking "..params
.database
.."...");
769 engine
= sql
:create_engine(params
);
770 upgrade_table(engine
, params
, true);
774 print("Unknown command: "..command
);
776 print("Available commands:");
777 print("","upgrade - Perform database upgrade");