explain why we put subprocess in the dict
[0tDNS.git] / src / perform_queries.py
blob1d20b13f00a9dff2a544ca36f2b942ff0bdcee2a
1 #!/bin/python3
3 from sys import argv
4 from threading import Thread
5 from time import sleep
6 import unbound
7 import psycopg2
9 # our own module used by several scripts in the project
10 from ztdnslib import start_db_connection, get_ztdns_config
12 class dns_queries:
13 def __init__(self, dns_IP, dns_id, services):
14 self.dns_IP = dns_IP
15 self.dns_id = dns_id
16 self.services = services
18 class single_query:
19 def __init__(self, hour, cursor, vpn_id, dns_id, service_id):
20 self.hour = hour
21 self.cursor = cursor
22 self.vpn_id = vpn_id
23 self.dns_id = dns_id
24 self.service_id = service_id
26 def query_planned_queries(cursor, hour, vpn_id):
27 # return [
28 # # dns server IP | dns server id | service_id | service_name
29 # dns_queries("195.98.79.117", 23, [[89, "devuan.org"],
30 # [44, "gry.pl"],
31 # [112, "invidio.us"]]),
32 # dns_queries("192.71.245.208", 33, [[77, "debian.org"],
33 # [22, "nie.ma.takiej.domeny"],
34 # [100, "onet.pl"]])
35 # ]
36 cursor.execute('''
37 SELECT DISTINCT d."IP", d.id
38 FROM user_side_queries AS q JOIN user_side_dns AS d
39 ON d.id = q.dns_id
40 WHERE q.vpn_id = %s
41 ''', (vpn_id,))
42 dnss = cursor.fetchall()
44 dnss_to_query = []
46 for dns_IP, dns_id in dnss:
47 cursor.execute('''
48 SELECT s.id, s.name
49 FROM user_side_service AS s JOIN user_side_queries AS q
50 ON s.id = q.service_id
51 WHERE q.vpn_id = %s AND q.dns_id = %s
52 ''', (vpn_id, dns_id))
54 queries = dns_queries(dns_IP, dns_id, cursor.fetchall())
56 dnss_to_query.append(queries)
58 return dnss_to_query
60 def resolve_call_back(mydata, status, result):
61 query = mydata
62 # debugging
63 print("callback called for {}".format(result.qname))
64 if status != 0:
65 result_info = 'internal failure: out of memory'
66 elif result.rcode == 0:
67 result_info = 'successful'
68 print("Result:",result.data.address_list)
69 elif result.rcode == 2:
70 result_info = 'no response'
71 elif result.rcode == 3:
72 result_info = 'not exists'
73 else:
74 result_info = 'DNS error: {}'.format(result.rcode_str)
76 # write to database
77 try:
78 query.cursor.execute('''
79 INSERT INTO user_side_responses
80 (date, result, dns_id, service_id, vpn_id)
81 VALUES (%s, %s, %s, %s, %s)
82 RETURNING id
83 ''', (query.hour, result_info, query.dns_id,
84 query.service_id, query.vpn_id))
86 responses_id = query.cursor.fetchone()[0]
88 if status==0 and result.havedata:
89 for address in result.data.address_list:
90 query.cursor.execute('''
91 INSERT INTO user_side_response (returned_ip, responses_id)
92 VALUES(%s, %s)
93 ''', (address, responses_id))
94 except psycopg2.IntegrityError:
95 # Unique constraint is stopping us from adding duplicates;
96 # This is most likey because back-end has been run multiple times
97 # during the same hour (bad configuration or admin running manually
98 # after cron)
99 pass
100 # no committing, since auto-commit mode is set on the connection
102 hour = argv[1]
103 vpn_id = argv[2]
104 config = get_ztdns_config()
106 def query_dns(dns_queries):
107 connection = start_db_connection(config)
108 cursor = connection.cursor()
109 ctx = unbound.ub_ctx()
110 ctx.set_fwd(dns_queries.dns_IP)
112 first = True
113 for service_id, service_name in dns_queries.services:
114 if first:
115 first = False
116 else:
117 sleep(0.4) # throttle between queries
119 print("starting resolution of {} through {}".format(service_name,
120 dns_queries.dns_IP))
121 query = single_query(hour, cursor, vpn_id,
122 dns_queries.dns_id, service_id)
124 ctx.resolve_async(service_name, query, resolve_call_back,
125 unbound.RR_TYPE_A, unbound.RR_CLASS_IN)
127 ctx.wait()
128 cursor.close()
129 connection.close()
131 connection = start_db_connection(config)
132 cursor = connection.cursor()
133 planned_queries = query_planned_queries(cursor, hour, vpn_id)
134 # each thread will make its own connection
135 cursor.close()
136 connection.close()
138 threads = []
139 for dns_queries in planned_queries:
140 thread = Thread(target = query_dns, args = (dns_queries,))
141 thread.start()
142 threads.append(thread)
144 for thread in threads:
145 thread.join()
147 cursor.close()
148 connection.close()