1 -- Copyright 2013 The Tor Project
2 -- See LICENSE for licensing information
4 -- Use enum types for dimensions that may only change if we write new code
5 -- to support them. For example, if there's a new node type beyond relay
6 -- and bridge, we'll have to write code to support it. This is in
7 -- contrast to dimensions like country, transport, or version which don't
8 -- have their possible values hard-coded anywhere.
9 CREATE TYPE node AS ENUM ('relay', 'bridge');
10 CREATE TYPE metric AS ENUM ('responses', 'bytes', 'status');
12 -- All new data first goes into the imported table. The import tool
13 -- should do some trivial checks for invalid or duplicate data, but
14 -- ultimately, we're going to do these checks in the database. For
15 -- example, the import tool could avoid importing data from the same
16 -- descriptor more than once, but it's fine to import the same history
17 -- string from distinct descriptors multiple times. The import tool must,
18 -- however, make sure that stats_end is not greater than 00:00:00 of the
19 -- day following stats_start. There are no constraints set on this table,
20 -- because importing data should be really, really fast. Once the newly
21 -- imported data is successfully processed, the imported table is emptied.
22 CREATE TABLE imported (
24 -- The 40-character upper-case hex string identifies a descriptor
25 -- uniquely and is used to join metrics (responses, bytes, status)
26 -- published by the same node (relay or bridge).
27 fingerprint CHARACTER(40) NOT NULL,
29 -- The node type is used to decide the statistics that this entry will
33 -- The metric of this entry describes the stored observation type.
34 -- We'll want to store different metrics published by a node:
35 -- - 'responses' are the number of v3 network status consensus requests
36 -- that the node responded to;
37 -- - 'bytes' are the number of bytes that the node wrote when answering
38 -- directory requests;
39 -- - 'status' are the intervals when the node was listed as running in
40 -- the network status published by either the directory authorities or
42 metric metric NOT NULL,
44 -- The two-letter lower-case country code that the observation in this
45 -- entry can be attributed to; can be '??' if no country information is
46 -- known for this entry, or '' (empty string) if this entry summarizes
47 -- observations for all countries.
48 country CHARACTER VARYING(2) NOT NULL,
50 -- The pluggable transport name that the observation in this entry can
51 -- be attributed to; can be '<OR>' if no pluggable transport was used,
52 -- '<??>' if an unknown pluggable transport was used, or '' (empty
53 -- string) if this entry summarizes observations for all transports.
54 transport CHARACTER VARYING(20) NOT NULL,
56 -- The IP address version that the observation in this entry can be
57 -- attributed to; can be 'v4' or 'v6' or '' (empty string) if this entry
58 -- summarizes observations for all IP address versions.
59 version CHARACTER VARYING(2) NOT NULL,
61 -- The interval start of this observation.
62 stats_start TIMESTAMP WITHOUT TIME ZONE NOT NULL,
64 -- The interval end of this observation. This timestamp must be greater
65 -- than stats_start and must not be greater than 00:00:00 of the day
66 -- following stats_start, which the import tool must make sure.
67 stats_end TIMESTAMP WITHOUT TIME ZONE NOT NULL,
69 -- Finally, the observed value.
70 val DOUBLE PRECISION NOT NULL
73 -- After importing new data into the imported table, they are merged into
74 -- the merged table using the merge() function. The merged table contains
75 -- the same data as the imported table, except:
76 -- (1) there are no duplicate or overlapping entries in the merged table
77 -- with respect to stats_start and stats_end and the same fingerprint,
78 -- node, metric, country, transport, and version columns;
79 -- (2) all subsequent intervals with the same node, metric, country,
80 -- transport, version, and stats_start date are compressed into a
84 -- The unique key that is only used when merging newly imported data
86 id SERIAL PRIMARY KEY,
88 -- All other columns have the same meaning as in the imported table.
89 fingerprint CHARACTER(40) NOT NULL,
91 metric metric NOT NULL,
92 country CHARACTER VARYING(2) NOT NULL,
93 transport CHARACTER VARYING(20) NOT NULL,
94 version CHARACTER VARYING(2) NOT NULL,
95 stats_start TIMESTAMP WITHOUT TIME ZONE NOT NULL,
96 stats_end TIMESTAMP WITHOUT TIME ZONE NOT NULL,
97 val DOUBLE PRECISION NOT NULL
100 -- After merging new data into the merged table, they are aggregated to
101 -- daily user number estimates using the aggregate() function. Only dates
102 -- with new data in the imported table will be recomputed in the
103 -- aggregated table. The aggregated components follow the algorithm
104 -- proposed in Tor Tech Report 2012-10-001.
105 CREATE TABLE aggregated (
107 -- The date of these aggregated observations.
110 -- The node, country, transport, and version columns all have the same
111 -- meaning as in the imported table.
113 country CHARACTER VARYING(2) NOT NULL DEFAULT '',
114 transport CHARACTER VARYING(20) NOT NULL DEFAULT '',
115 version CHARACTER VARYING(2) NOT NULL DEFAULT '',
117 -- Total number of reported responses, possibly broken down by country,
118 -- transport, or version if either of them is not ''. See r(R) in the
120 rrx DOUBLE PRECISION NOT NULL DEFAULT 0,
122 -- Total number of seconds of nodes reporting responses, possibly broken
123 -- down by country, transport, or version if either of them is not ''.
124 -- This would be referred to as n(R) in the tech report, though it's not
126 nrx DOUBLE PRECISION NOT NULL DEFAULT 0,
128 -- Total number of reported bytes. See h(H) in the tech report.
129 hh DOUBLE PRECISION NOT NULL DEFAULT 0,
131 -- Total number of seconds of nodes in the status. See n(N) in the tech
133 nn DOUBLE PRECISION NOT NULL DEFAULT 0,
135 -- Number of reported bytes of nodes that reported both responses and
136 -- bytes. See h(R intersect H) in the tech report.
137 hrh DOUBLE PRECISION NOT NULL DEFAULT 0,
139 -- Number of seconds of nodes reporting bytes. See n(H) in the tech
141 nh DOUBLE PRECISION NOT NULL DEFAULT 0,
143 -- Number of seconds of nodes reporting responses but no bytes. See
144 -- n(R \ H) in the tech report.
145 nrh DOUBLE PRECISION NOT NULL DEFAULT 0
148 CREATE LANGUAGE plpgsql;
150 -- Merge new entries from the imported table into the merged table, and
151 -- compress them while doing so. This function first executes a query to
152 -- match all entries in the imported table with adjacent or even
153 -- overlapping entries in the merged table. It then loops over query
154 -- results and either inserts or updates entries in the merged table. The
155 -- idea is to leave query optimization to the database and only touch
156 -- as few entries as possible while running this function.
157 CREATE OR REPLACE FUNCTION merge() RETURNS VOID AS $$
160 -- The current record that we're handling in the loop body.
163 -- Various information about the last record we processed, so that we
164 -- can merge the current record with the last one if possible.
165 last_fingerprint CHARACTER(40) := NULL;
168 last_country CHARACTER VARYING(2);
169 last_transport CHARACTER VARYING(20);
170 last_version CHARACTER VARYING(2);
171 last_start TIMESTAMP WITHOUT TIME ZONE;
172 last_end TIMESTAMP WITHOUT TIME ZONE;
174 last_val DOUBLE PRECISION;
176 -- Interval end and value of the last record before updating them in the
177 -- last loop step. In a few edge cases, we may update an entry and
178 -- learn in the next loop step that the updated entry overlaps with the
179 -- subsequent entry. In these cases we'll have to undo the update,
180 -- which is why we're storing the updated values.
181 undo_end TIMESTAMP WITHOUT TIME ZONE;
182 undo_val DOUBLE PRECISION;
185 RAISE NOTICE '% Starting to merge.', timeofday();
187 -- TODO Maybe we'll have to materialize a merged_part table that only
188 -- contains dates IN (SELECT DISTINCT DATE(stats_start) FROM imported)
189 -- and use that in the query below.
191 -- Loop over results from a query that joins new entries in the imported
192 -- table with existing entries in the merged table.
193 FOR cur IN SELECT DISTINCT
195 -- Select id, interval start and end, and value of the existing entry
196 -- in merged; all these fields may be null if the imported entry is
197 -- not adjacent to an existing one.
198 merged.id AS merged_id,
199 merged.stats_start AS merged_start,
200 merged.stats_end AS merged_end,
201 merged.val AS merged_val,
203 -- Select interval start and end and value of the newly imported
205 imported.stats_start AS imported_start,
206 imported.stats_end AS imported_end,
207 imported.val AS imported_val,
209 -- Select columns that define the group of entries that can be merged
210 -- in the merged table.
211 imported.fingerprint AS fingerprint,
212 imported.node AS node,
213 imported.metric AS metric,
214 imported.country AS country,
215 imported.transport AS transport,
216 imported.version AS version
218 -- Select these columns from all entries in the imported table, plus
219 -- do an outer join on the merged table to find adjacent entries that
220 -- we might want to merge the new entries with. It's possible that we
221 -- handle the same imported entry twice, if it starts directly after
222 -- one existing entry and ends directly before another existing entry.
223 FROM imported LEFT JOIN merged
225 -- First two join conditions are to find adjacent intervals. In fact,
226 -- we also include overlapping intervals here, so that we can skip the
227 -- overlapping entry in the imported table.
228 ON imported.stats_end >= merged.stats_start AND
229 imported.stats_start <= merged.stats_end AND
231 -- Further join conditions are same date, fingerprint, node, etc.,
232 -- so that we don't merge entries that don't belong together.
233 DATE(imported.stats_start) = DATE(merged.stats_start) AND
234 imported.fingerprint = merged.fingerprint AND
235 imported.node = merged.node AND
236 imported.metric = merged.metric AND
237 imported.country = merged.country AND
238 imported.transport = merged.transport AND
239 imported.version = merged.version
241 -- Ordering is key, or our approach to merge subsequent entries is
243 ORDER BY imported.fingerprint, imported.node, imported.metric,
244 imported.country, imported.transport, imported.version,
245 imported.stats_start, merged.stats_start, imported.stats_end
247 -- Now go through the results one by one.
250 -- Log that we're done with the query and about to start merging.
251 IF last_fingerprint IS NULL THEN
252 RAISE NOTICE '% Query returned, now merging entries.', timeofday();
255 -- If we're processing the very first entry or if we have reached a
256 -- new group of entries that belong together, (re-)set last_*
258 IF last_fingerprint IS NULL OR
259 DATE(cur.imported_start) <> DATE(last_start) OR
260 cur.fingerprint <> last_fingerprint OR
261 cur.node <> last_node OR
262 cur.metric <> last_metric OR
263 cur.country <> last_country OR
264 cur.transport <> last_transport OR
265 cur.version <> last_version THEN
267 last_start := '1970-01-01 00:00:00';
268 last_end := '1970-01-01 00:00:00';
272 -- Remember all fields that determine the group of which entries
274 last_fingerprint := cur.fingerprint;
275 last_node := cur.node;
276 last_metric := cur.metric;
277 last_country := cur.country;
278 last_transport := cur.transport;
279 last_version := cur.version;
281 -- If the existing entry that we're currently looking at starts before
282 -- the previous entry ends, we have created two overlapping entries in
283 -- the last iteration, and that is not allowed. Undo the previous
285 IF cur.merged_start IS NOT NULL AND
286 cur.merged_start < last_end AND
287 undo_end IS NOT NULL AND undo_val IS NOT NULL THEN
288 UPDATE merged SET stats_end = undo_end, val = undo_val
293 -- If there is no adjacent entry to the one we're about to merge,
294 -- insert it as new entry.
295 ELSIF cur.merged_end IS NULL THEN
296 IF cur.imported_start > last_end THEN
297 last_start := cur.imported_start;
298 last_end := cur.imported_end;
299 last_val := cur.imported_val;
300 INSERT INTO merged (fingerprint, node, metric, country, transport,
301 version, stats_start, stats_end, val)
302 VALUES (last_fingerprint, last_node, last_metric, last_country,
303 last_transport, last_version, last_start, last_end,
305 RETURNING id INTO last_id;
307 -- If there was no adjacent entry before starting to merge, but
308 -- there is now one ending right before the new entry starts, merge
309 -- the new entry into the existing one.
310 ELSIF cur.imported_start = last_end THEN
311 last_val := last_val + cur.imported_val;
312 last_end := cur.imported_end;
313 UPDATE merged SET stats_end = last_end, val = last_val
317 -- There's no risk of this entry overlapping with the next.
321 -- If the new entry ends right when an existing entry starts, but
322 -- there's a gap between when the previously processed entry ends and
323 -- when the new entry starts, merge the new entry with the existing
324 -- entry we're currently looking at.
325 ELSIF cur.imported_end = cur.merged_start THEN
326 IF cur.imported_start > last_end THEN
327 last_id := cur.merged_id;
328 last_start := cur.imported_start;
329 last_end := cur.merged_end;
330 last_val := cur.imported_val + cur.merged_val;
331 UPDATE merged SET stats_start = last_start, val = last_val
334 -- If the new entry ends right when an existing entry starts and
335 -- there's no gap between when the previousl processed entry ends
336 -- and when the new entry starts, merge the new entry with the other
337 -- two entries. This happens by deleting the previous entry and
338 -- expanding the subsequent entry to cover all three entries.
339 ELSIF cur.imported_start = last_end THEN
340 DELETE FROM merged WHERE id = last_id;
341 last_id := cur.merged_id;
342 last_end := cur.merged_end;
343 last_val := last_val + cur.merged_val;
344 UPDATE merged SET stats_start = last_start, val = last_val
348 -- There's no risk of this entry overlapping with the next.
352 -- If the new entry starts right when an existing entry ends, but
353 -- there's a gap between the previously processed entry and the
354 -- existing one, extend the existing entry. There's a special case
355 -- when this operation is false and must be undone, which is when the
356 -- newly added entry overlaps with the subsequent entry. That's why
357 -- we have to store the old interval end and value, so that this
358 -- operation can be undone in the next loop iteration.
359 ELSIF cur.imported_start = cur.merged_end THEN
360 IF last_end < cur.imported_start THEN
361 undo_end := cur.merged_end;
362 undo_val := cur.merged_val;
363 last_id := cur.merged_id;
364 last_start := cur.merged_start;
365 last_end := cur.imported_end;
366 last_val := cur.merged_val + cur.imported_val;
367 UPDATE merged SET stats_end = last_end, val = last_val
370 -- If the new entry starts right when an existing entry ends and
371 -- there's no gap between the previously processed entry and the
372 -- existing entry, extend the existing entry. This is very similar
373 -- to the previous case. The same reasoning about possibly having
374 -- to undo this operation applies.
376 undo_end := cur.merged_end;
377 undo_val := last_val;
378 last_end := cur.imported_end;
379 last_val := last_val + cur.imported_val;
380 UPDATE merged SET stats_end = last_end, val = last_val
384 -- If none of the cases above applies, there must have been an overlap
385 -- between the new entry and an existing one. Skip the new entry.
387 last_id := cur.merged_id;
388 last_start := cur.merged_start;
389 last_end := cur.merged_end;
390 last_val := cur.merged_val;
396 -- That's it, we're done merging.
397 RAISE NOTICE '% Finishing merge.', timeofday();
402 -- Aggregate user estimates for all dates that have updated entries in the
403 -- merged table. This function first creates a temporary table with
404 -- new or updated observations, then removes all existing estimates for
405 -- the dates to be updated, and finally inserts newly computed aggregates
407 CREATE OR REPLACE FUNCTION aggregate() RETURNS VOID AS $$
409 RAISE NOTICE '% Starting aggregate step.', timeofday();
411 -- Create a new temporary table containing all relevant information
412 -- needed to update the aggregated table. In this table, we sum up all
413 -- observations of a given type by reporting node. This query is
414 -- (temporarily) materialized, because we need to combine its entries
415 -- multiple times in various ways. A (non-materialized) view would have
416 -- meant to re-compute this query multiple times.
417 CREATE TEMPORARY TABLE update AS
418 SELECT fingerprint, node, metric, country, transport, version,
419 DATE(stats_start), SUM(val) AS val,
420 SUM(CAST(EXTRACT(EPOCH FROM stats_end - stats_start)
421 AS DOUBLE PRECISION)) AS seconds
423 WHERE DATE(stats_start) IN (
424 SELECT DISTINCT DATE(stats_start) FROM imported)
425 GROUP BY fingerprint, node, metric, country, transport, version,
428 -- Delete all entries from the aggregated table that we're about to
430 DELETE FROM aggregated WHERE date IN (SELECT DISTINCT date FROM update);
432 -- Insert partly empty results for all existing combinations of date,
433 -- node ('relay' or 'bridge'), country, transport, and version. Only
434 -- the rrx and nrx fields will contain number and seconds of reported
435 -- responses for the given combination of date, node, etc., while the
436 -- other fields will be updated below.
437 INSERT INTO aggregated (date, node, country, transport, version, rrx,
439 SELECT date, node, country, transport, version, SUM(val) AS rrx,
441 FROM update WHERE metric = 'responses'
442 GROUP BY date, node, country, transport, version;
444 -- Create another temporary table with only those entries that aren't
445 -- broken down by any dimension. This table is much smaller, so the
446 -- following operations are much faster.
447 CREATE TEMPORARY TABLE update_no_dimensions AS
448 SELECT fingerprint, node, metric, date, val, seconds FROM update
453 -- Update results in the aggregated table by setting aggregates based
454 -- on reported directory bytes. These aggregates are only based on
455 -- date and node, so that the same values are set for all combinations
456 -- of country, transport, and version.
458 SET hh = aggregated_bytes.hh, nh = aggregated_bytes.nh
460 SELECT date, node, SUM(val) AS hh, SUM(seconds) AS nh
461 FROM update_no_dimensions
462 WHERE metric = 'bytes'
465 WHERE aggregated.date = aggregated_bytes.date
466 AND aggregated.node = aggregated_bytes.node;
468 -- Update results based on nodes being contained in the network status.
470 SET nn = aggregated_status.nn
472 SELECT date, node, SUM(seconds) AS nn
473 FROM update_no_dimensions
474 WHERE metric = 'status'
477 WHERE aggregated.date = aggregated_status.date
478 AND aggregated.node = aggregated_status.node;
480 -- Update results based on nodes reporting both bytes and responses.
482 SET hrh = aggregated_bytes_responses.hrh
484 SELECT bytes.date, bytes.node,
485 SUM((LEAST(bytes.seconds, responses.seconds)
486 * bytes.val) / bytes.seconds) AS hrh
487 FROM update_no_dimensions bytes
488 LEFT JOIN update_no_dimensions responses
489 ON bytes.date = responses.date
490 AND bytes.fingerprint = responses.fingerprint
491 AND bytes.node = responses.node
492 WHERE bytes.metric = 'bytes'
493 AND responses.metric = 'responses'
494 GROUP BY bytes.date, bytes.node
495 ) aggregated_bytes_responses
496 WHERE aggregated.date = aggregated_bytes_responses.date
497 AND aggregated.node = aggregated_bytes_responses.node;
499 -- Update results based on notes reporting responses but no bytes.
501 SET nrh = aggregated_responses_bytes.nrh
503 SELECT responses.date, responses.node,
504 SUM(GREATEST(0, responses.seconds
505 - COALESCE(bytes.seconds, 0))) AS nrh
506 FROM update_no_dimensions responses
507 LEFT JOIN update_no_dimensions bytes
508 ON responses.date = bytes.date
509 AND responses.fingerprint = bytes.fingerprint
510 AND responses.node = bytes.node
511 WHERE responses.metric = 'responses'
512 AND bytes.metric = 'bytes'
513 GROUP BY responses.date, responses.node
514 ) aggregated_responses_bytes
515 WHERE aggregated.date = aggregated_responses_bytes.date
516 AND aggregated.node = aggregated_responses_bytes.node;
518 -- We're done aggregating new data.
519 RAISE NOTICE '% Finishing aggregate step.', timeofday();
524 -- User-friendly view on the aggregated table that implements the
525 -- algorithm proposed in Tor Tech Report 2012-10-001. This view returns
526 -- user number estimates for both relay and bridge staistics, possibly
527 -- broken down by country or transport or version.
528 CREATE OR REPLACE VIEW estimated AS SELECT
530 -- The date of this user number estimate.
533 -- The node type, which is either 'relay' or 'bridge'.
536 -- The two-letter lower-case country code of this estimate; can be '??'
537 -- for an estimate of users that could not be resolved to any country,
538 -- or '' (empty string) for an estimate of all users, regardless of
542 -- The pluggable transport name of this estimate; can be '<OR>' for an
543 -- estimate of users that did not use any pluggable transport, '<??>'
544 -- for unknown pluggable transports, or '' (empty string) for an
545 -- estimate of all users, regardless of transport.
548 -- The IP address version of this estimate; can be 'v4' or 'v6', or ''
549 -- (empty string) for an estimate of all users, regardless of IP address
553 -- Estimated fraction of nodes reporting directory requests, which is
554 -- used to extrapolate observed requests to estimated total requests in
555 -- the network. The closer this fraction is to 1.0, the more precise
557 CAST(a.frac * 100 AS INTEGER) AS frac,
559 -- Finally, the estimate number of users.
560 CAST(a.rrx / (a.frac * 10) AS INTEGER) AS users
562 -- Implement the estimation method in a subquery, so that the ugly
563 -- formula only has to be written once.
565 SELECT date, node, country, transport, version, rrx, nrx,
566 (hrh * nh + hh * nrh) / (hh * nn) AS frac
567 FROM aggregated WHERE hh * nn > 0.0) a
569 -- Only include estimates with at least 10% of nodes reporting directory
570 -- request statistics.
571 WHERE a.frac BETWEEN 0.1 AND 1.0
574 ORDER BY date DESC, node, version, transport, country;