add license
[0tDNS.git] / src / perform_queries.py
blob029651ba4696a07bab0fc8bb6d067929b9dacad9
1 #!/bin/python3
3 from sys import argv
4 from threading import Thread
5 from time import sleep
6 import unbound
7 import psycopg2
9 # our own module used by several scripts in the project
10 from ztdnslib import start_db_connection, get_ztdns_config, log, set_loghour
12 class dns_queries:
13 def __init__(self, dns_IP, dns_id, services):
14 self.dns_IP = dns_IP
15 self.dns_id = dns_id
16 self.services = services
18 class single_query:
19 def __init__(self, hour, cursor, vpn_id, dns_id, service_id):
20 self.hour = hour
21 self.cursor = cursor
22 self.vpn_id = vpn_id
23 self.dns_id = dns_id
24 self.service_id = service_id
26 def query_planned_queries(cursor, hour, vpn_id):
27 # return [
28 # # dns server IP | dns server id | service_id | service_name
29 # dns_queries("195.98.79.117", 23, [[89, "devuan.org"],
30 # [44, "gry.pl"],
31 # [112, "invidio.us"]]),
32 # dns_queries("192.71.245.208", 33, [[77, "debian.org"],
33 # [22, "nie.ma.takiej.domeny"],
34 # [100, "onet.pl"]])
35 # ]
36 cursor.execute('''
37 SELECT DISTINCT d."IP", d.id
38 FROM user_side_queries AS q JOIN user_side_dns AS d
39 ON d.id = q.dns_id
40 WHERE q.vpn_id = %s
41 ''', (vpn_id,))
42 dnss = cursor.fetchall()
44 dnss_to_query = []
46 for dns_IP, dns_id in dnss:
47 cursor.execute('''
48 SELECT s.id, s.web_address
49 FROM user_side_service AS s JOIN user_side_queries AS q
50 ON s.id = q.service_id
51 WHERE q.vpn_id = %s AND q.dns_id = %s
52 ''', (vpn_id, dns_id))
54 queries = dns_queries(dns_IP, dns_id, cursor.fetchall())
56 dnss_to_query.append(queries)
58 return dnss_to_query
60 def resolve_call_back(mydata, status, result):
61 global dups
63 query = mydata
64 # debugging
65 print("callback called for {}".format(result.qname))
66 if status != 0:
67 result_info = 'internal failure: out of memory'
68 elif result.rcode == 0:
69 result_info = 'successful'
70 print("Result:",result.data.address_list)
71 elif result.rcode == 2:
72 result_info = 'no response'
73 elif result.rcode == 3:
74 result_info = 'not exists'
75 else:
76 result_info = 'DNS error: {}'.format(result.rcode_str)
78 # write to database
79 try:
80 query.cursor.connection.autocommit = False
81 query.cursor.execute('''
82 INSERT INTO user_side_responses
83 (date, result, dns_id, service_id, vpn_id)
84 VALUES (%s, %s, %s, %s, %s)
85 RETURNING id
86 ''', (query.hour, result_info, query.dns_id,
87 query.service_id, query.vpn_id))
89 responses_id = query.cursor.fetchone()[0]
91 if status == 0:
92 # an even better solution would be to have a trigger delete
93 # the record when validity reaches 0
94 query.cursor.execute('''
95 UPDATE user_side_queries
96 SET validity = validity - 1
97 WHERE dns_id = %s AND service_id = %s AND vpn_id = %s;
99 DELETE FROM user_side_queries
100 WHERE dns_id = %s AND service_id = %s AND vpn_id = %s AND
101 validity < 1;
102 ''', (query.dns_id, query.service_id, query.vpn_id,
103 query.dns_id, query.service_id, query.vpn_id))
105 query.cursor.connection.commit()
106 query.cursor.connection.autocommit = True
108 if status == 0 and result.havedata:
109 for address in result.data.address_list:
110 query.cursor.execute('''
111 INSERT INTO user_side_response (returned_ip, responses_id)
112 VALUES(%s, %s)
113 ''', (address, responses_id))
115 except psycopg2.IntegrityError:
116 query.cursor.connection.rollback()
117 # Unique constraint is stopping us from adding duplicates;
118 # This is most likey because back-end has been run multiple times
119 # during the same hour (bad configuration or admin running manually
120 # after cron), we'll write to logs about that.
121 dups = True
123 dups = False
124 hour = argv[1]
125 set_loghour(hour) # log() function will now prepend messages with hour
126 vpn_id = argv[2]
127 config = get_ztdns_config()
129 def query_dns(dns_queries):
130 connection = start_db_connection(config)
131 cursor = connection.cursor()
132 ctx = unbound.ub_ctx()
133 ctx.set_fwd(dns_queries.dns_IP)
135 first = True
136 for service_id, service_name in dns_queries.services:
137 if first:
138 first = False
139 else:
140 sleep(0.4) # throttle between queries
142 print("starting resolution of {} through {}".format(service_name,
143 dns_queries.dns_IP))
144 query = single_query(hour, cursor, vpn_id,
145 dns_queries.dns_id, service_id)
147 ctx.resolve_async(service_name, query, resolve_call_back,
148 unbound.RR_TYPE_A, unbound.RR_CLASS_IN)
150 ctx.wait()
151 cursor.close()
152 connection.close()
154 connection = start_db_connection(config)
155 cursor = connection.cursor()
156 planned_queries = query_planned_queries(cursor, hour, vpn_id)
157 # each thread will make its own connection
158 cursor.close()
159 connection.close()
161 threads = []
162 for dns_queries in planned_queries:
163 thread = Thread(target = query_dns, args = (dns_queries,))
164 thread.start()
165 threads.append(thread)
167 for thread in threads:
168 thread.join()
170 if dups:
171 log('results already exist for vpn {}'.format(vpn_id))