4 from threading
import Thread
9 # our own module used by several scripts in the project
10 from ztdnslib
import start_db_connection
, get_ztdns_config
13 def __init__(self
, dns_IP
, dns_id
, services
):
16 self
.services
= services
19 def __init__(self
, hour
, cursor
, vpn_id
, dns_id
, service_id
):
24 self
.service_id
= service_id
26 def query_planned_queries(cursor
, hour
, vpn_id
):
28 # # dns server IP | dns server id | service_id | service_name
29 # dns_queries("195.98.79.117", 23, [[89, "devuan.org"],
31 # [112, "invidio.us"]]),
32 # dns_queries("192.71.245.208", 33, [[77, "debian.org"],
33 # [22, "nie.ma.takiej.domeny"],
37 SELECT DISTINCT d."IP", d.id
38 FROM user_side_queries AS q JOIN user_side_dns AS d
42 dnss
= cursor
.fetchall()
46 for dns_IP
, dns_id
in dnss
:
49 FROM user_side_service AS s JOIN user_side_queries AS q
50 ON s.id = q.service_id
51 WHERE q.vpn_id = %s AND q.dns_id = %s
52 ''', (vpn_id
, dns_id
))
54 queries
= dns_queries(dns_IP
, dns_id
, cursor
.fetchall())
56 dnss_to_query
.append(queries
)
60 def resolve_call_back(mydata
, status
, result
):
63 print("callback called for {}".format(result
.qname
))
65 result_info
= 'internal failure: out of memory'
66 elif result
.rcode
== 0:
67 result_info
= 'successful'
68 print("Result:",result
.data
.address_list
)
69 elif result
.rcode
== 2:
70 result_info
= 'no response'
71 elif result
.rcode
== 3:
72 result_info
= 'not exists'
74 result_info
= 'DNS error: {}'.format(result
.rcode_str
)
78 query
.cursor
.execute('''
79 INSERT INTO user_side_responses
80 (date, result, dns_id, service_id, vpn_id)
81 VALUES (%s, %s, %s, %s, %s)
83 ''', (query
.hour
, result_info
, query
.dns_id
,
84 query
.service_id
, query
.vpn_id
))
86 responses_id
= query
.cursor
.fetchone()[0]
88 if status
==0 and result
.havedata
:
89 for address
in result
.data
.address_list
:
90 query
.cursor
.execute('''
91 INSERT INTO user_side_response (returned_ip, responses_id)
93 ''', (address
, responses_id
))
94 except psycopg2
.IntegrityError
:
95 # Unique constraint is stopping us from adding duplicates;
96 # This is most likey because back-end has been run multiple times
97 # during the same hour (bad configuration or admin running manually
100 # no committing, since auto-commit mode is set on the connection
104 config
= get_ztdns_config()
106 def query_dns(dns_queries
):
107 connection
= start_db_connection(config
)
108 cursor
= connection
.cursor()
109 ctx
= unbound
.ub_ctx()
110 ctx
.set_fwd(dns_queries
.dns_IP
)
113 for service_id
, service_name
in dns_queries
.services
:
117 sleep(0.4) # throttle between queries
119 print("starting resolution of {} through {}".format(service_name
,
121 query
= single_query(hour
, cursor
, vpn_id
,
122 dns_queries
.dns_id
, service_id
)
124 ctx
.resolve_async(service_name
, query
, resolve_call_back
,
125 unbound
.RR_TYPE_A
, unbound
.RR_CLASS_IN
)
131 connection
= start_db_connection(config
)
132 cursor
= connection
.cursor()
133 planned_queries
= query_planned_queries(cursor
, hour
, vpn_id
)
134 # each thread will make its own connection
139 for dns_queries
in planned_queries
:
140 thread
= Thread(target
= query_dns
, args
= (dns_queries
,))
142 threads
.append(thread
)
144 for thread
in threads
: