Update criteria for partial/full IPv6 support.
[tor-metrics-tasks.git] / task-8462 / init-userstats.sql
blob7c5df3d82f57da42c1b92662062a4f2cc61028a5
1 -- Copyright 2013 The Tor Project
2 -- See LICENSE for licensing information
4 -- Use enum types for dimensions that may only change if we write new code
5 -- to support them.  For example, if there's a new node type beyond relay
6 -- and bridge, we'll have to write code to support it.  This is in
7 -- contrast to dimensions like country, transport, or version which don't
8 -- have their possible values hard-coded anywhere.
9 CREATE TYPE node AS ENUM ('relay', 'bridge');
10 CREATE TYPE metric AS ENUM ('responses', 'bytes', 'status');
12 -- All new data first goes into the imported table.  The import tool
13 -- should do some trivial checks for invalid or duplicate data, but
14 -- ultimately, we're going to do these checks in the database.  For
15 -- example, the import tool could avoid importing data from the same
16 -- descriptor more than once, but it's fine to import the same history
17 -- string from distinct descriptors multiple times.  The import tool must,
18 -- however, make sure that stats_end is not greater than 00:00:00 of the
19 -- day following stats_start.  There are no constraints set on this table,
20 -- because importing data should be really, really fast.  Once the newly
21 -- imported data is successfully processed, the imported table is emptied.
22 CREATE TABLE imported (
24   -- The 40-character upper-case hex string identifies a descriptor
25   -- uniquely and is used to join metrics (responses, bytes, status)
26   -- published by the same node (relay or bridge).
27   fingerprint CHARACTER(40) NOT NULL,
29   -- The node type is used to decide the statistics that this entry will
30   -- be part of.
31   node node NOT NULL,
33   -- The metric of this entry describes the stored observation type.
34   -- We'll want to store different metrics published by a node:
35   -- - 'responses' are the number of v3 network status consensus requests
36   --   that the node responded to;
37   -- - 'bytes' are the number of bytes that the node wrote when answering
38   --   directory requests;
39   -- - 'status' are the intervals when the node was listed as running in
40   --   the network status published by either the directory authorities or
41   --   bridge authority.
42   metric metric NOT NULL,
44   -- The two-letter lower-case country code that the observation in this
45   -- entry can be attributed to; can be '??' if no country information is
46   -- known for this entry, or '' (empty string) if this entry summarizes
47   -- observations for all countries.
48   country CHARACTER VARYING(2) NOT NULL,
50   -- The pluggable transport name that the observation in this entry can
51   -- be attributed to; can be '<OR>' if no pluggable transport was used,
52   -- '<??>' if an unknown pluggable transport was used, or '' (empty
53   -- string) if this entry summarizes observations for all transports.
54   transport CHARACTER VARYING(20) NOT NULL,
56   -- The IP address version that the observation in this entry can be
57   -- attributed to; can be 'v4' or 'v6' or '' (empty string) if this entry
58   -- summarizes observations for all IP address versions.
59   version CHARACTER VARYING(2) NOT NULL,
61   -- The interval start of this observation.
62   stats_start TIMESTAMP WITHOUT TIME ZONE NOT NULL,
64   -- The interval end of this observation.  This timestamp must be greater
65   -- than stats_start and must not be greater than 00:00:00 of the day
66   -- following stats_start, which the import tool must make sure.
67   stats_end TIMESTAMP WITHOUT TIME ZONE NOT NULL,
69   -- Finally, the observed value.
70   val DOUBLE PRECISION NOT NULL
73 -- After importing new data into the imported table, they are merged into
74 -- the merged table using the merge() function.  The merged table contains
75 -- the same data as the imported table, except:
76 -- (1) there are no duplicate or overlapping entries in the merged table
77 --     with respect to stats_start and stats_end and the same fingerprint,
78 --     node, metric, country, transport, and version columns;
79 -- (2) all subsequent intervals with the same node, metric, country,
80 --     transport, version, and stats_start date are compressed into a
81 --     single entry.
82 CREATE TABLE merged (
84   -- The unique key that is only used when merging newly imported data
85   -- into this table.
86   id SERIAL PRIMARY KEY,
88   -- All other columns have the same meaning as in the imported table.
89   fingerprint CHARACTER(40) NOT NULL,
90   node node NOT NULL,
91   metric metric NOT NULL,
92   country CHARACTER VARYING(2) NOT NULL,
93   transport CHARACTER VARYING(20) NOT NULL,
94   version CHARACTER VARYING(2) NOT NULL,
95   stats_start TIMESTAMP WITHOUT TIME ZONE NOT NULL,
96   stats_end TIMESTAMP WITHOUT TIME ZONE NOT NULL,
97   val DOUBLE PRECISION NOT NULL
100 -- After merging new data into the merged table, they are aggregated to
101 -- daily user number estimates using the aggregate() function.  Only dates
102 -- with new data in the imported table will be recomputed in the
103 -- aggregated table.  The aggregated components follow the algorithm
104 -- proposed in Tor Tech Report 2012-10-001.
105 CREATE TABLE aggregated (
107   -- The date of these aggregated observations.
108   date DATE NOT NULL,
110   -- The node, country, transport, and version columns all have the same
111   -- meaning as in the imported table.
112   node node NOT NULL,
113   country CHARACTER VARYING(2) NOT NULL DEFAULT '',
114   transport CHARACTER VARYING(20) NOT NULL DEFAULT '',
115   version CHARACTER VARYING(2) NOT NULL DEFAULT '',
117   -- Total number of reported responses, possibly broken down by country,
118   -- transport, or version if either of them is not ''.  See r(R) in the
119   -- tech report.
120   rrx DOUBLE PRECISION NOT NULL DEFAULT 0,
122   -- Total number of seconds of nodes reporting responses, possibly broken
123   -- down by country, transport, or version if either of them is not ''.
124   -- This would be referred to as n(R) in the tech report, though it's not
125   -- used there.
126   nrx DOUBLE PRECISION NOT NULL DEFAULT 0,
128   -- Total number of reported bytes.  See h(H) in the tech report.
129   hh DOUBLE PRECISION NOT NULL DEFAULT 0,
131   -- Total number of seconds of nodes in the status.  See n(N) in the tech
132   -- report.
133   nn DOUBLE PRECISION NOT NULL DEFAULT 0,
135   -- Number of reported bytes of nodes that reported both responses and
136   -- bytes.  See h(R intersect H) in the tech report.
137   hrh DOUBLE PRECISION NOT NULL DEFAULT 0,
139   -- Number of seconds of nodes reporting bytes.  See n(H) in the tech
140   -- report.
141   nh DOUBLE PRECISION NOT NULL DEFAULT 0,
143   -- Number of seconds of nodes reporting responses but no bytes.  See
144   -- n(R \ H) in the tech report.
145   nrh DOUBLE PRECISION NOT NULL DEFAULT 0
148 CREATE LANGUAGE plpgsql;
150 -- Merge new entries from the imported table into the merged table, and
151 -- compress them while doing so.  This function first executes a query to
152 -- match all entries in the imported table with adjacent or even
153 -- overlapping entries in the merged table.  It then loops over query
154 -- results and either inserts or updates entries in the merged table.  The
155 -- idea is to leave query optimization to the database and only touch
156 -- as few entries as possible while running this function.
157 CREATE OR REPLACE FUNCTION merge() RETURNS VOID AS $$
158 DECLARE
160   -- The current record that we're handling in the loop body.
161   cur RECORD;
163   -- Various information about the last record we processed, so that we
164   -- can merge the current record with the last one if possible.
165   last_fingerprint CHARACTER(40) := NULL;
166   last_node node;
167   last_metric metric;
168   last_country CHARACTER VARYING(2);
169   last_transport CHARACTER VARYING(20);
170   last_version CHARACTER VARYING(2);
171   last_start TIMESTAMP WITHOUT TIME ZONE;
172   last_end TIMESTAMP WITHOUT TIME ZONE;
173   last_id INTEGER;
174   last_val DOUBLE PRECISION;
176   -- Interval end and value of the last record before updating them in the
177   -- last loop step.  In a few edge cases, we may update an entry and
178   -- learn in the next loop step that the updated entry overlaps with the
179   -- subsequent entry.  In these cases we'll have to undo the update,
180   -- which is why we're storing the updated values.
181   undo_end TIMESTAMP WITHOUT TIME ZONE;
182   undo_val DOUBLE PRECISION;
184 BEGIN
185   RAISE NOTICE '% Starting to merge.', timeofday();
187   -- TODO Maybe we'll have to materialize a merged_part table that only
188   -- contains dates IN (SELECT DISTINCT DATE(stats_start) FROM imported)
189   -- and use that in the query below.
191   -- Loop over results from a query that joins new entries in the imported
192   -- table with existing entries in the merged table.
193   FOR cur IN SELECT DISTINCT
195     -- Select id, interval start and end, and value of the existing entry
196     -- in merged; all these fields may be null if the imported entry is
197     -- not adjacent to an existing one.
198     merged.id AS merged_id,
199     merged.stats_start AS merged_start,
200     merged.stats_end AS merged_end,
201     merged.val AS merged_val,
203     -- Select interval start and end and value of the newly imported
204     -- entry.
205     imported.stats_start AS imported_start,
206     imported.stats_end AS imported_end,
207     imported.val AS imported_val,
209     -- Select columns that define the group of entries that can be merged
210     -- in the merged table.
211     imported.fingerprint AS fingerprint,
212     imported.node AS node,
213     imported.metric AS metric,
214     imported.country AS country,
215     imported.transport AS transport,
216     imported.version AS version
218     -- Select these columns from all entries in the imported table, plus
219     -- do an outer join on the merged table to find adjacent entries that
220     -- we might want to merge the new entries with.  It's possible that we
221     -- handle the same imported entry twice, if it starts directly after
222     -- one existing entry and ends directly before another existing entry.
223     FROM imported LEFT JOIN merged
225     -- First two join conditions are to find adjacent intervals.  In fact,
226     -- we also include overlapping intervals here, so that we can skip the
227     -- overlapping entry in the imported table.
228     ON imported.stats_end >= merged.stats_start AND
229        imported.stats_start <= merged.stats_end AND
231        -- Further join conditions are same date, fingerprint, node, etc.,
232        -- so that we don't merge entries that don't belong together.
233        DATE(imported.stats_start) = DATE(merged.stats_start) AND
234        imported.fingerprint = merged.fingerprint AND
235        imported.node = merged.node AND
236        imported.metric = merged.metric AND
237        imported.country = merged.country AND
238        imported.transport = merged.transport AND
239        imported.version = merged.version
241     -- Ordering is key, or our approach to merge subsequent entries is
242     -- going to break.
243     ORDER BY imported.fingerprint, imported.node, imported.metric,
244              imported.country, imported.transport, imported.version,
245              imported.stats_start, merged.stats_start, imported.stats_end
247   -- Now go through the results one by one.
248   LOOP
250     -- Log that we're done with the query and about to start merging.
251     IF last_fingerprint IS NULL THEN
252       RAISE NOTICE '% Query returned, now merging entries.', timeofday();
253     END IF;
255     -- If we're processing the very first entry or if we have reached a
256     -- new group of entries that belong together, (re-)set last_*
257     -- variables.
258     IF last_fingerprint IS NULL OR
259         DATE(cur.imported_start) <> DATE(last_start) OR
260         cur.fingerprint <> last_fingerprint OR
261         cur.node <> last_node OR
262         cur.metric <> last_metric OR
263         cur.country <> last_country OR
264         cur.transport <> last_transport OR
265         cur.version <> last_version THEN
266       last_id := -1;
267       last_start := '1970-01-01 00:00:00';
268       last_end := '1970-01-01 00:00:00';
269       last_val := -1;
270     END IF;
272     -- Remember all fields that determine the group of which entries
273     -- belong together.
274     last_fingerprint := cur.fingerprint;
275     last_node := cur.node;
276     last_metric := cur.metric;
277     last_country := cur.country;
278     last_transport := cur.transport;
279     last_version := cur.version;
281     -- If the existing entry that we're currently looking at starts before
282     -- the previous entry ends, we have created two overlapping entries in
283     -- the last iteration, and that is not allowed.  Undo the previous
284     -- change.
285     IF cur.merged_start IS NOT NULL AND
286         cur.merged_start < last_end AND
287         undo_end IS NOT NULL AND undo_val IS NOT NULL THEN
288       UPDATE merged SET stats_end = undo_end, val = undo_val
289         WHERE id = last_id;
290       undo_end := NULL;
291       undo_val := NULL;
293     -- If there is no adjacent entry to the one we're about to merge,
294     -- insert it as new entry.
295     ELSIF cur.merged_end IS NULL THEN
296       IF cur.imported_start > last_end THEN
297         last_start := cur.imported_start;
298         last_end := cur.imported_end;
299         last_val := cur.imported_val;
300         INSERT INTO merged (fingerprint, node, metric, country, transport,
301                             version, stats_start, stats_end, val)
302           VALUES (last_fingerprint, last_node, last_metric, last_country,
303                   last_transport, last_version, last_start, last_end,
304                   last_val)
305           RETURNING id INTO last_id;
307       -- If there was no adjacent entry before starting to merge, but
308       -- there is now one ending right before the new entry starts, merge
309       -- the new entry into the existing one.
310       ELSIF cur.imported_start = last_end THEN
311         last_val := last_val + cur.imported_val;
312         last_end := cur.imported_end;
313         UPDATE merged SET stats_end = last_end, val = last_val
314           WHERE id = last_id;
315       END IF;
317       -- There's no risk of this entry overlapping with the next.
318       undo_end := NULL;
319       undo_val := NULL;
321     -- If the new entry ends right when an existing entry starts, but
322     -- there's a gap between when the previously processed entry ends and
323     -- when the new entry starts, merge the new entry with the existing
324     -- entry we're currently looking at.
325     ELSIF cur.imported_end = cur.merged_start THEN
326       IF cur.imported_start > last_end THEN
327         last_id := cur.merged_id;
328         last_start := cur.imported_start;
329         last_end := cur.merged_end;
330         last_val := cur.imported_val + cur.merged_val;
331         UPDATE merged SET stats_start = last_start, val = last_val
332           WHERE id = last_id;
334       -- If the new entry ends right when an existing entry starts and
335       -- there's no gap between when the previousl processed entry ends
336       -- and when the new entry starts, merge the new entry with the other
337       -- two entries.  This happens by deleting the previous entry and
338       -- expanding the subsequent entry to cover all three entries.
339       ELSIF cur.imported_start = last_end THEN
340         DELETE FROM merged WHERE id = last_id;
341         last_id := cur.merged_id;
342         last_end := cur.merged_end;
343         last_val := last_val + cur.merged_val;
344         UPDATE merged SET stats_start = last_start, val = last_val
345           WHERE id = last_id;
346       END IF;
348       -- There's no risk of this entry overlapping with the next.
349       undo_end := NULL;
350       undo_val := NULL;
352     -- If the new entry starts right when an existing entry ends, but
353     -- there's a gap between the previously processed entry and the
354     -- existing one, extend the existing entry.  There's a special case
355     -- when this operation is false and must be undone, which is when the
356     -- newly added entry overlaps with the subsequent entry.  That's why
357     -- we have to store the old interval end and value, so that this
358     -- operation can be undone in the next loop iteration.
359     ELSIF cur.imported_start = cur.merged_end THEN
360       IF last_end < cur.imported_start THEN
361         undo_end := cur.merged_end;
362         undo_val := cur.merged_val;
363         last_id := cur.merged_id;
364         last_start := cur.merged_start;
365         last_end := cur.imported_end;
366         last_val := cur.merged_val + cur.imported_val;
367         UPDATE merged SET stats_end = last_end, val = last_val
368           WHERE id = last_id;
370       -- If the new entry starts right when an existing entry ends and
371       -- there's no gap between the previously processed entry and the
372       -- existing entry, extend the existing entry.  This is very similar
373       -- to the previous case.  The same reasoning about possibly having
374       -- to undo this operation applies.
375       ELSE
376         undo_end := cur.merged_end;
377         undo_val := last_val;
378         last_end := cur.imported_end;
379         last_val := last_val + cur.imported_val;
380         UPDATE merged SET stats_end = last_end, val = last_val
381           WHERE id = last_id;
382       END IF;
384     -- If none of the cases above applies, there must have been an overlap
385     -- between the new entry and an existing one.  Skip the new entry.
386     ELSE
387       last_id := cur.merged_id;
388       last_start := cur.merged_start;
389       last_end := cur.merged_end;
390       last_val := cur.merged_val;
391       undo_end := NULL;
392       undo_val := NULL;
393     END IF;
394   END LOOP;
396   -- That's it, we're done merging.
397   RAISE NOTICE '% Finishing merge.', timeofday();
398   RETURN;
399 END;
400 $$ LANGUAGE plpgsql;
402 -- Aggregate user estimates for all dates that have updated entries in the
403 -- merged table.  This function first creates a temporary table with
404 -- new or updated observations, then removes all existing estimates for
405 -- the dates to be updated, and finally inserts newly computed aggregates
406 -- for these dates.
407 CREATE OR REPLACE FUNCTION aggregate() RETURNS VOID AS $$
408 BEGIN
409   RAISE NOTICE '% Starting aggregate step.', timeofday();
411   -- Create a new temporary table containing all relevant information
412   -- needed to update the aggregated table.  In this table, we sum up all
413   -- observations of a given type by reporting node.  This query is
414   -- (temporarily) materialized, because we need to combine its entries
415   -- multiple times in various ways.  A (non-materialized) view would have
416   -- meant to re-compute this query multiple times.
417   CREATE TEMPORARY TABLE update AS
418     SELECT fingerprint, node, metric, country, transport, version,
419            DATE(stats_start), SUM(val) AS val,
420            SUM(CAST(EXTRACT(EPOCH FROM stats_end - stats_start)
421                AS DOUBLE PRECISION)) AS seconds
422     FROM merged
423     WHERE DATE(stats_start) IN (
424           SELECT DISTINCT DATE(stats_start) FROM imported)
425     GROUP BY fingerprint, node, metric, country, transport, version,
426              DATE(stats_start);
428   -- Delete all entries from the aggregated table that we're about to
429   -- re-compute.
430   DELETE FROM aggregated WHERE date IN (SELECT DISTINCT date FROM update);
432   -- Insert partly empty results for all existing combinations of date,
433   -- node ('relay' or 'bridge'), country, transport, and version.  Only
434   -- the rrx and nrx fields will contain number and seconds of reported
435   -- responses for the given combination of date, node, etc., while the
436   -- other fields will be updated below.
437   INSERT INTO aggregated (date, node, country, transport, version, rrx,
438       nrx)
439     SELECT date, node, country, transport, version, SUM(val) AS rrx,
440     SUM(seconds) AS nrx
441     FROM update WHERE metric = 'responses'
442     GROUP BY date, node, country, transport, version;
444   -- Create another temporary table with only those entries that aren't
445   -- broken down by any dimension.  This table is much smaller, so the
446   -- following operations are much faster.
447   CREATE TEMPORARY TABLE update_no_dimensions AS
448     SELECT fingerprint, node, metric, date, val, seconds FROM update
449     WHERE country = ''
450     AND transport = ''
451     AND version = '';
453   -- Update results in the aggregated table by setting aggregates based
454   -- on reported directory bytes.  These aggregates are only based on
455   -- date and node, so that the same values are set for all combinations
456   -- of country, transport, and version.
457   UPDATE aggregated
458     SET hh = aggregated_bytes.hh, nh = aggregated_bytes.nh
459     FROM (
460       SELECT date, node, SUM(val) AS hh, SUM(seconds) AS nh
461       FROM update_no_dimensions
462       WHERE metric = 'bytes'
463       GROUP BY date, node
464     ) aggregated_bytes
465     WHERE aggregated.date = aggregated_bytes.date
466     AND aggregated.node = aggregated_bytes.node;
468   -- Update results based on nodes being contained in the network status.
469   UPDATE aggregated
470     SET nn = aggregated_status.nn
471     FROM (
472       SELECT date, node, SUM(seconds) AS nn
473       FROM update_no_dimensions
474       WHERE metric = 'status'
475       GROUP BY date, node
476     ) aggregated_status
477     WHERE aggregated.date = aggregated_status.date
478     AND aggregated.node = aggregated_status.node;
480   -- Update results based on nodes reporting both bytes and responses.
481   UPDATE aggregated
482     SET hrh = aggregated_bytes_responses.hrh
483     FROM (
484       SELECT bytes.date, bytes.node,
485              SUM((LEAST(bytes.seconds, responses.seconds)
486                  * bytes.val) / bytes.seconds) AS hrh
487       FROM update_no_dimensions bytes
488       LEFT JOIN update_no_dimensions responses
489       ON bytes.date = responses.date
490       AND bytes.fingerprint = responses.fingerprint
491       AND bytes.node = responses.node
492       WHERE bytes.metric = 'bytes'
493       AND responses.metric = 'responses'
494       GROUP BY bytes.date, bytes.node
495     ) aggregated_bytes_responses
496     WHERE aggregated.date = aggregated_bytes_responses.date
497     AND aggregated.node = aggregated_bytes_responses.node;
499   -- Update results based on notes reporting responses but no bytes.
500   UPDATE aggregated
501     SET nrh = aggregated_responses_bytes.nrh
502     FROM (
503       SELECT responses.date, responses.node,
504              SUM(GREATEST(0, responses.seconds
505                              - COALESCE(bytes.seconds, 0))) AS nrh
506       FROM update_no_dimensions responses
507       LEFT JOIN update_no_dimensions bytes
508       ON responses.date = bytes.date
509       AND responses.fingerprint = bytes.fingerprint
510       AND responses.node = bytes.node
511       WHERE responses.metric = 'responses'
512       AND bytes.metric = 'bytes'
513       GROUP BY responses.date, responses.node
514     ) aggregated_responses_bytes
515     WHERE aggregated.date = aggregated_responses_bytes.date
516     AND aggregated.node = aggregated_responses_bytes.node;
518   -- We're done aggregating new data.
519   RAISE NOTICE '% Finishing aggregate step.', timeofday();
520   RETURN;
521 END;
522 $$ LANGUAGE plpgsql;
524 -- User-friendly view on the aggregated table that implements the
525 -- algorithm proposed in Tor Tech Report 2012-10-001.  This view returns
526 -- user number estimates for both relay and bridge staistics, possibly
527 -- broken down by country or transport or version.
528 CREATE OR REPLACE VIEW estimated AS SELECT
530   -- The date of this user number estimate.
531   a.date,
533   -- The node type, which is either 'relay' or 'bridge'.
534   a.node,
536   -- The two-letter lower-case country code of this estimate; can be '??'
537   -- for an estimate of users that could not be resolved to any country,
538   -- or '' (empty string) for an estimate of all users, regardless of
539   -- country.
540   a.country,
542   -- The pluggable transport name of this estimate; can be '<OR>' for an
543   -- estimate of users that did not use any pluggable transport, '<??>'
544   -- for unknown pluggable transports, or '' (empty string) for an
545   -- estimate of all users, regardless of transport.
546   a.transport,
548   -- The IP address version of this estimate; can be 'v4' or 'v6', or ''
549   -- (empty string) for an estimate of all users, regardless of IP address
550   -- version.
551   a.version,
553   -- Estimated fraction of nodes reporting directory requests, which is
554   -- used to extrapolate observed requests to estimated total requests in
555   -- the network.  The closer this fraction is to 1.0, the more precise
556   -- the estimation.
557   CAST(a.frac * 100 AS INTEGER) AS frac,
559   -- Finally, the estimate number of users.
560   CAST(a.rrx / (a.frac * 10) AS INTEGER) AS users
562   -- Implement the estimation method in a subquery, so that the ugly
563   -- formula only has to be written once.
564   FROM (
565     SELECT date, node, country, transport, version, rrx, nrx,
566            (hrh * nh + hh * nrh) / (hh * nn) AS frac
567     FROM aggregated WHERE hh * nn > 0.0) a
569   -- Only include estimates with at least 10% of nodes reporting directory
570   -- request statistics.
571   WHERE a.frac BETWEEN 0.1 AND 1.0
573   -- Order results.
574   ORDER BY date DESC, node, version, transport, country;