2 -- luacheck: ignore 212/self
4 local json
= require
"util.json";
5 local sql
= require
"util.sql";
6 local xml_parse
= require
"util.xml".parse
;
7 local uuid
= require
"util.uuid";
8 local resolve_relative_path
= require
"util.paths".resolve_relative_path
;
10 local is_stanza
= require
"util.stanza".is_stanza
;
11 local t_concat
= table.concat
;
13 local noop
= function() end
14 local unpack
= table.unpack
or unpack
;
15 local function iterator(result
)
16 return function(result_
)
17 local row
= result_();
24 local default_params
= { driver
= "SQLite3" };
28 local function serialize(value
)
29 local t
= type(value
);
30 if t
== "string" or t
== "boolean" or t
== "number" then
31 return t
, tostring(value
);
32 elseif is_stanza(value
) then
33 return "xml", tostring(value
);
34 elseif t
== "table" then
35 local encoded
,err
= json
.encode(value
);
36 if encoded
then return "json", encoded
; end
39 return nil, "Unhandled value type: "..t
;
41 local function deserialize(t
, value
)
42 if t
== "string" then return value
;
43 elseif t
== "boolean" then
44 if value
== "true" then return true;
45 elseif value
== "false" then return false; end
46 return nil, "invalid-boolean";
47 elseif t
== "number" then
48 value
= tonumber(value
);
49 if value
then return value
; end
50 return nil, "invalid-number";
51 elseif t
== "json" then
52 return json
.decode(value
);
53 elseif t
== "xml" then
54 return xml_parse(value
);
56 return nil, "Unhandled value type: "..t
;
59 local host
= module
.host
;
62 local function keyval_store_get()
66 SELECT "key","type","value"
68 WHERE "host"=? AND "user"=? AND "store"=?;
70 for row
in engine
:select(select_sql
, host
, user
or "", store
) do
73 local v
, e
= deserialize(row
[2], row
[3]);
76 if k
~= "" then result
[k
] = v
; elseif type(v
) == "table" then
77 for a
,b
in pairs(v
) do
87 local function keyval_store_set(data
)
90 WHERE "host"=? AND "user"=? AND "store"=?
92 engine
:delete(delete_sql
, host
, user
or "", store
);
96 ("host","user","store","key","type","value")
99 if data
and next(data
) ~= nil then
100 local extradata
= {};
101 for key
, value
in pairs(data
) do
102 if type(key
) == "string" and key
~= "" then
103 local t
, encoded_value
= assert(serialize(value
));
104 engine
:insert(insert_sql
, host
, user
or "", store
, key
, t
, encoded_value
);
106 extradata
[key
] = value
;
109 if next(extradata
) ~= nil then
110 local t
, encoded_extradata
= assert(serialize(extradata
));
111 engine
:insert(insert_sql
, host
, user
or "", store
, "", t
, encoded_extradata
);
117 --- Key/value store API (default store type)
119 local keyval_store
= {};
120 keyval_store
.__index
= keyval_store
;
121 function keyval_store
:get(username
)
122 user
, store
= username
, self
.store
;
123 local ok
, result
= engine
:transaction(keyval_store_get
);
125 module
:log("error", "Unable to read from database %s store for %s: %s", store
, username
or "<host>", result
);
130 function keyval_store
:set(username
, data
)
131 user
,store
= username
,self
.store
;
132 return engine
:transaction(function()
133 return keyval_store_set(data
);
136 function keyval_store
:users()
137 local ok
, result
= engine
:transaction(function()
138 local select_sql
= [[
139 SELECT DISTINCT "user"
141 WHERE "host"=? AND "store"=?;
143 return engine
:select(select_sql
, host
, self
.store
);
145 if not ok
then error(result
); end
146 return iterator(result
);
149 --- Archive store API
151 -- luacheck: ignore 512 431/user 431/store
152 local map_store
= {};
153 map_store
.__index
= map_store
;
154 map_store
.remove = {};
155 function map_store
:get(username
, key
)
156 local ok
, result
= engine
:transaction(function()
158 SELECT "type", "value"
160 WHERE "host"=? AND "user"=? AND "store"=? AND "key"=?
164 if type(key
) == "string" and key
~= "" then
165 for row
in engine
:select(query
, host
, username
or "", self
.store
, key
) do
166 data
, err
= deserialize(row
[1], row
[2]);
167 assert(data
~= nil, err
);
171 for row
in engine
:select(query
, host
, username
or "", self
.store
, "") do
172 data
, err
= deserialize(row
[1], row
[2]);
173 assert(data
~= nil, err
);
175 return data
and data
[key
] or nil;
178 if not ok
then return nil, result
; end
181 function map_store
:set(username
, key
, data
)
182 if data
== nil then data
= self
.remove; end
183 return self
:set_keys(username
, { [key
] = data
});
185 function map_store
:set_keys(username
, keydatas
)
186 local ok
, result
= engine
:transaction(function()
187 local delete_sql
= [[
188 DELETE FROM "prosody"
189 WHERE "host"=? AND "user"=? AND "store"=? AND "key"=?;
191 local insert_sql
= [[
192 INSERT INTO "prosody"
193 ("host","user","store","key","type","value")
194 VALUES (?,?,?,?,?,?);
196 local select_extradata_sql
= [[
197 SELECT "type", "value"
199 WHERE "host"=? AND "user"=? AND "store"=? AND "key"=?
202 for key
, data
in pairs(keydatas
) do
203 if type(key
) == "string" and key
~= "" then
204 engine
:delete(delete_sql
,
205 host
, username
or "", self
.store
, key
);
206 if data
~= self
.remove then
207 local t
, value
= assert(serialize(data
));
208 engine
:insert(insert_sql
, host
, username
or "", self
.store
, key
, t
, value
);
211 local extradata
, err
= {};
212 for row
in engine
:select(select_extradata_sql
, host
, username
or "", self
.store
, "") do
213 extradata
, err
= deserialize(row
[1], row
[2]);
214 assert(extradata
~= nil, err
);
216 engine
:delete(delete_sql
, host
, username
or "", self
.store
, "");
217 extradata
[key
] = data
;
218 local t
, value
= assert(serialize(extradata
));
219 engine
:insert(insert_sql
, host
, username
or "", self
.store
, "", t
, value
);
224 if not ok
then return nil, result
; end
228 local archive_store
= {}
229 archive_store
.caps
= {
232 archive_store
.__index
= archive_store
233 function archive_store
:append(username
, key
, value
, when
, with
)
234 local user
,store
= username
,self
.store
;
235 when
= when
or os
.time();
237 local ok
, ret
= engine
:transaction(function()
238 local delete_sql
= [[
239 DELETE FROM "prosodyarchive"
240 WHERE "host"=? AND "user"=? AND "store"=? AND "key"=?;
242 local insert_sql
= [[
243 INSERT INTO "prosodyarchive"
244 ("host", "user", "store", "when", "with", "key", "type", "value")
245 VALUES (?,?,?,?,?,?,?,?);
248 engine
:delete(delete_sql
, host
, user
or "", store
, key
);
250 key
= uuid
.generate();
252 local t
, encoded_value
= assert(serialize(value
));
253 engine
:insert(insert_sql
, host
, user
or "", store
, when
, with
, key
, t
, encoded_value
);
256 if not ok
then return ok
, ret
; end
257 return ret
; -- the key
260 -- Helpers for building the WHERE clause
261 local function archive_where(query
, args
, where
)
262 -- Time range, inclusive
264 args
[#args
+1] = query
.start
265 where
[#where
+1] = "\"when\" >= ?"
269 args
[#args
+1] = query
["end"];
271 where
[#where
] = "\"when\" BETWEEN ? AND ?" -- is this inclusive?
273 where
[#where
+1] = "\"when\" <= ?"
279 where
[#where
+1] = "\"with\" = ?";
280 args
[#args
+1] = query
.with
285 where
[#where
+1] = "\"key\" = ?";
286 args
[#args
+1] = query
.key
289 local function archive_where_id_range(query
, args
, where
)
290 local args_len
= #args
291 -- Before or after specific item, exclusive
292 if query
.after
then -- keys better be unique!
294 "sort_id" > COALESCE(
297 FROM "prosodyarchive"
298 WHERE "key" = ? AND "host" = ? AND "user" = ? AND "store" = ?
302 args
[args_len
+1], args
[args_len
+2], args
[args_len
+3], args
[args_len
+4] = query
.after
, args
[1], args
[2], args
[3];
303 args_len
= args_len
+ 4
307 "sort_id" < COALESCE(
310 FROM "prosodyarchive"
311 WHERE "key" = ? AND "host" = ? AND "user" = ? AND "store" = ?
315 SELECT MAX("sort_id")+1
316 FROM "prosodyarchive"
320 args
[args_len
+1], args
[args_len
+2], args
[args_len
+3], args
[args_len
+4] = query
.before
, args
[1], args
[2], args
[3];
324 function archive_store
:find(username
, query
)
326 local user
,store
= username
,self
.store
;
328 local ok
, result
= engine
:transaction(function()
330 SELECT "key", "type", "value", "when", "with"
331 FROM "prosodyarchive"
333 ORDER BY "sort_id" %s%s;
335 local args
= { host
, user
or "", store
, };
336 local where
= { "\"host\" = ?", "\"user\" = ?", "\"store\" = ?", };
338 archive_where(query
, args
, where
);
342 local stats
= engine
:select("SELECT COUNT(*) FROM \"prosodyarchive\" WHERE "
343 .. t_concat(where
, " AND "), unpack(args
));
349 if query
.limit
== 0 then -- Skip the real query
354 archive_where_id_range(query
, args
, where
);
357 args
[#args
+1] = query
.limit
;
360 sql_query
= sql_query
:format(t_concat(where
, " AND "), query
.reverse
361 and "DESC" or "ASC", query
.limit
and " LIMIT ?" or "");
362 return engine
:select(sql_query
, unpack(args
));
364 if not ok
then return ok
, result
end
366 local row
= result();
368 local value
, err
= deserialize(row
[2], row
[3]);
369 assert(value
~= nil, err
);
370 return row
[1], value
, row
[4], row
[5];
375 function archive_store
:delete(username
, query
)
377 local user
,store
= username
,self
.store
;
378 local ok
, stmt
= engine
:transaction(function()
379 local sql_query
= "DELETE FROM \"prosodyarchive\" WHERE %s;";
380 local args
= { host
, user
or "", store
, };
381 local where
= { "\"host\" = ?", "\"user\" = ?", "\"store\" = ?", };
383 table.remove(args
, 2);
384 table.remove(where
, 2);
386 archive_where(query
, args
, where
);
387 archive_where_id_range(query
, args
, where
);
388 if query
.truncate
== nil then
389 sql_query
= sql_query
:format(t_concat(where
, " AND "));
391 args
[#args
+1] = query
.truncate
;
392 local unlimited
= "ALL";
393 if engine
.params
.driver
== "SQLite3" then
395 DELETE FROM "prosodyarchive"
397 ORDER BY "sort_id" %s
401 elseif engine
.params
.driver
== "MySQL" then
403 DELETE result FROM prosodyarchive AS result JOIN (
404 SELECT sort_id FROM prosodyarchive
406 ORDER BY "sort_id" %s
408 ) AS limiter on result.sort_id = limiter.sort_id;]];
409 unlimited
= "18446744073709551615";
412 DELETE FROM "prosodyarchive"
414 SELECT "sort_id" FROM "prosodyarchive"
416 ORDER BY "sort_id" %s
420 sql_query
= string.format(sql_query
, t_concat(where
, " AND "),
421 query
.reverse
and "ASC" or "DESC", unlimited
);
423 return engine
:delete(sql_query
, unpack(args
));
425 return ok
and stmt
:affected(), stmt
;
429 keyval
= keyval_store
;
431 archive
= archive_store
;
434 --- Implement storage driver API
436 -- FIXME: Some of these operations need to operate on the archive store(s) too
440 function driver
:open(store
, typ
)
441 local store_mt
= stores
[typ
or "keyval"];
443 return setmetatable({ store
= store
}, store_mt
);
445 return nil, "unsupported-store";
448 function driver
:stores(username
)
449 local query
= "SELECT DISTINCT \"store\" FROM \"prosody\" WHERE \"host\"=? AND \"user\"" ..
450 (username
== true and "!=?" or "=?");
451 if username
== true or not username
then
454 local ok
, result
= engine
:transaction(function()
455 return engine
:select(query
, host
, username
);
457 if not ok
then return ok
, result
end
458 return iterator(result
);
461 function driver
:purge(username
)
462 return engine
:transaction(function()
463 engine
:delete("DELETE FROM \"prosody\" WHERE \"host\"=? AND \"user\"=?", host
, username
);
464 engine
:delete("DELETE FROM \"prosodyarchive\" WHERE \"host\"=? AND \"user\"=?", host
, username
);
471 local function create_table(engine
) -- luacheck: ignore 431/engine
472 local Table
, Column
, Index
= sql
.Table
, sql
.Column
, sql
.Index
;
474 local ProsodyTable
= Table
{
476 Column
{ name
="host", type="TEXT", nullable
=false };
477 Column
{ name
="user", type="TEXT", nullable
=false };
478 Column
{ name
="store", type="TEXT", nullable
=false };
479 Column
{ name
="key", type="TEXT", nullable
=false };
480 Column
{ name
="type", type="TEXT", nullable
=false };
481 Column
{ name
="value", type="MEDIUMTEXT", nullable
=false };
482 Index
{ name
="prosody_index", "host", "user", "store", "key" };
484 engine
:transaction(function()
485 ProsodyTable
:create(engine
);
488 local ProsodyArchiveTable
= Table
{
489 name
="prosodyarchive";
490 Column
{ name
="sort_id", type="INTEGER", primary_key
=true, auto_increment
=true };
491 Column
{ name
="host", type="TEXT", nullable
=false };
492 Column
{ name
="user", type="TEXT", nullable
=false };
493 Column
{ name
="store", type="TEXT", nullable
=false };
494 Column
{ name
="key", type="TEXT", nullable
=false }; -- item id
495 Column
{ name
="when", type="INTEGER", nullable
=false }; -- timestamp
496 Column
{ name
="with", type="TEXT", nullable
=false }; -- related id
497 Column
{ name
="type", type="TEXT", nullable
=false };
498 Column
{ name
="value", type="MEDIUMTEXT", nullable
=false };
499 Index
{ name
="prosodyarchive_index", unique
= engine
.params
.driver
~= "MySQL", "host", "user", "store", "key" };
500 Index
{ name
="prosodyarchive_with_when", "host", "user", "store", "with", "when" };
501 Index
{ name
="prosodyarchive_when", "host", "user", "store", "when" };
503 engine
:transaction(function()
504 ProsodyArchiveTable
:create(engine
);
508 local function upgrade_table(engine
, params
, apply_changes
) -- luacheck: ignore 431/engine
509 local changes
= false;
510 if params
.driver
== "MySQL" then
511 local success
,err
= engine
:transaction(function()
513 local result
= assert(engine
:execute("SHOW COLUMNS FROM \"prosody\" WHERE \"Field\"='value' and \"Type\"='text'"));
514 if result
:rowcount() > 0 then
516 if apply_changes
then
517 module
:log("info", "Upgrading database schema (value column size)...");
518 assert(engine
:execute("ALTER TABLE \"prosody\" MODIFY COLUMN \"value\" MEDIUMTEXT"));
519 module
:log("info", "Database table automatically upgraded");
525 -- Ensure index is not unique (issue #1073)
526 local result
= assert(engine
:execute([[SHOW INDEX FROM prosodyarchive WHERE key_name='prosodyarchive_index' and non_unique=0]]));
527 if result
:rowcount() > 0 then
529 if apply_changes
then
530 module
:log("info", "Upgrading database schema (prosodyarchive_index)...");
531 assert(engine
:execute[[ALTER TABLE "prosodyarchive" DROP INDEX prosodyarchive_index;]]);
532 local new_index
= sql
.Index
{ table = "prosodyarchive", name
="prosodyarchive_index", "host", "user", "store", "key" };
533 assert(engine
:_create_index(new_index
));
534 module
:log("info", "Database table automatically upgraded");
541 module
:log("error", "Failed to check/upgrade database schema (%s), please see "
542 .."https://prosody.im/doc/mysql for help",
543 err
or "unknown error");
547 -- COMPAT w/pre-0.10: Upgrade table to UTF-8 if not already
548 local check_encoding_query
= [[
549 SELECT "COLUMN_NAME","COLUMN_TYPE","TABLE_NAME"
550 FROM "information_schema"."columns"
551 WHERE "TABLE_NAME" LIKE 'prosody%%'
552 AND "TABLE_SCHEMA" = ?
553 AND ( "CHARACTER_SET_NAME"!=? OR "COLLATION_NAME"!=?);
555 -- FIXME Is it ok to ignore the return values from this?
556 engine
:transaction(function()
557 local result
= assert(engine
:execute(check_encoding_query
, params
.database
, engine
.charset
, engine
.charset
.."_bin"));
558 local n_bad_columns
= result
:rowcount();
559 if n_bad_columns
> 0 then
561 if apply_changes
then
562 module
:log("warn", "Found %d columns in prosody table requiring encoding change, updating now...", n_bad_columns
);
563 local fix_column_query1
= "ALTER TABLE \"%s\" CHANGE \"%s\" \"%s\" BLOB;";
564 local fix_column_query2
= "ALTER TABLE \"%s\" CHANGE \"%s\" \"%s\" %s CHARACTER SET '%s' COLLATE '%s_bin';";
565 for row
in result
:rows() do
566 local column_name
, column_type
, table_name
= unpack(row
);
567 module
:log("debug", "Fixing column %s in table %s", column_name
, table_name
);
568 engine
:execute(fix_column_query1
:format(table_name
, column_name
, column_name
));
569 engine
:execute(fix_column_query2
:format(table_name
, column_name
, column_name
, column_type
, engine
.charset
, engine
.charset
));
571 module
:log("info", "Database encoding upgrade complete!");
575 success
,err
= engine
:transaction(function()
576 return engine
:execute(check_encoding_query
, params
.database
,
577 engine
.charset
, engine
.charset
.."_bin");
580 module
:log("error", "Failed to check/upgrade database encoding: %s", err
or "unknown error");
587 local function normalize_database(driver
, database
) -- luacheck: ignore 431/driver
588 if driver
== "SQLite3" and database
~= ":memory:" then
589 return resolve_relative_path(prosody
.paths
.data
or ".", database
or "prosody.sqlite");
594 local function normalize_params(params
)
596 driver
= assert(params
.driver
,
597 "Configuration error: Both the SQL driver and the database need to be specified");
598 database
= assert(normalize_database(params
.driver
, params
.database
),
599 "Configuration error: Both the SQL driver and the database need to be specified");
600 username
= params
.username
;
601 password
= params
.password
;
607 function module
.load()
608 if prosody
.prosodyctl
then return; end
609 local engines
= module
:shared("/*/sql/connections");
610 local params
= normalize_params(module
:get_option("sql", default_params
));
611 engine
= engines
[sql
.db2uri(params
)];
613 module
:log("debug", "Creating new engine");
614 engine
= sql
:create_engine(params
, function (engine
) -- luacheck: ignore 431/engine
615 if module
:get_option("sql_manage_tables", true) then
616 -- Automatically create table, ignore failure (table probably already exists)
617 -- FIXME: we should check in information_schema, etc.
618 create_table(engine
);
619 -- Check whether the table needs upgrading
620 if upgrade_table(engine
, params
, false) then
621 module
:log("error", "Old database format detected. Please run: prosodyctl mod_%s upgrade", module
.name
);
622 return false, "database upgrade needed";
626 engines
[sql
.db2uri(params
)] = engine
;
629 module
:provides("storage", driver
);
632 function module
.command(arg
)
633 local config
= require
"core.configmanager";
634 local prosodyctl
= require
"util.prosodyctl";
635 local command
= table.remove(arg
, 1);
636 if command
== "upgrade" then
637 -- We need to find every unique dburi in the config
639 for host
in pairs(prosody
.hosts
) do -- luacheck: ignore 431/host
640 local params
= normalize_params(config
.get(host
, "sql") or default_params
);
641 uris
[sql
.db2uri(params
)] = params
;
643 print("We will check and upgrade the following databases:\n");
644 for _
, params
in pairs(uris
) do
645 print("", "["..params
.driver
.."] "..params
.database
..(params
.host
and " on "..params
.host
or ""));
648 print("Ensure you have working backups of the above databases before continuing! ");
649 if not prosodyctl
.show_yesno("Continue with the database upgrade? [yN]") then
650 print("Ok, no upgrade. But you do have backups, don't you? ...don't you?? :-)");
654 for _
, params
in pairs(uris
) do
655 print("Checking "..params
.database
.."...");
656 engine
= sql
:create_engine(params
);
657 upgrade_table(engine
, params
, true);
661 print("Unknown command: "..command
);
663 print("Available commands:");
664 print("","upgrade - Perform database upgrade");