4 from threading
import Thread
9 # our own module used by several scripts in the project
10 from ztdnslib
import start_db_connection
, get_ztdns_config
, log
, set_loghour
13 def __init__(self
, dns_IP
, dns_id
, services
):
16 self
.services
= services
19 def __init__(self
, hour
, cursor
, vpn_id
, dns_id
, service_id
):
24 self
.service_id
= service_id
26 def query_planned_queries(cursor
, hour
, vpn_id
):
28 # # dns server IP | dns server id | service_id | service_name
29 # dns_queries("195.98.79.117", 23, [[89, "devuan.org"],
31 # [112, "invidio.us"]]),
32 # dns_queries("192.71.245.208", 33, [[77, "debian.org"],
33 # [22, "nie.ma.takiej.domeny"],
37 SELECT DISTINCT d."IP", d.id
38 FROM user_side_queries AS q JOIN user_side_dns AS d
42 dnss
= cursor
.fetchall()
46 for dns_IP
, dns_id
in dnss
:
48 SELECT s.id, s.web_address
49 FROM user_side_service AS s JOIN user_side_queries AS q
50 ON s.id = q.service_id
51 WHERE q.vpn_id = %s AND q.dns_id = %s
52 ''', (vpn_id
, dns_id
))
54 queries
= dns_queries(dns_IP
, dns_id
, cursor
.fetchall())
56 dnss_to_query
.append(queries
)
60 def resolve_call_back(mydata
, status
, result
):
65 print("callback called for {}".format(result
.qname
))
67 result_info
= 'internal failure: out of memory'
68 elif result
.rcode
== 0:
69 result_info
= 'successful'
70 print("Result:",result
.data
.address_list
)
71 elif result
.rcode
== 2:
72 result_info
= 'no response'
73 elif result
.rcode
== 3:
74 result_info
= 'not exists'
76 result_info
= 'DNS error: {}'.format(result
.rcode_str
)
80 query
.cursor
.connection
.autocommit
= False
81 query
.cursor
.execute('''
82 INSERT INTO user_side_responses
83 (date, result, dns_id, service_id, vpn_id)
84 VALUES (%s, %s, %s, %s, %s)
86 ''', (query
.hour
, result_info
, query
.dns_id
,
87 query
.service_id
, query
.vpn_id
))
89 responses_id
= query
.cursor
.fetchone()[0]
92 # an even better solution would be to have a trigger delete
93 # the record when validity reaches 0
94 query
.cursor
.execute('''
95 UPDATE user_side_queries
96 SET validity = validity - 1
97 WHERE dns_id = %s AND service_id = %s AND vpn_id = %s;
99 DELETE FROM user_side_queries
100 WHERE dns_id = %s AND service_id = %s AND vpn_id = %s AND
102 ''', (query
.dns_id
, query
.service_id
, query
.vpn_id
,
103 query
.dns_id
, query
.service_id
, query
.vpn_id
))
105 query
.cursor
.connection
.commit()
106 query
.cursor
.connection
.autocommit
= True
108 if status
== 0 and result
.havedata
:
109 for address
in result
.data
.address_list
:
110 query
.cursor
.execute('''
111 INSERT INTO user_side_response (returned_ip, responses_id)
113 ''', (address
, responses_id
))
115 except psycopg2
.IntegrityError
:
116 query
.cursor
.connection
.rollback()
117 # Unique constraint is stopping us from adding duplicates;
118 # This is most likey because back-end has been run multiple times
119 # during the same hour (bad configuration or admin running manually
120 # after cron), we'll write to logs about that.
125 set_loghour(hour
) # log() function will now prepend messages with hour
127 config
= get_ztdns_config()
129 def query_dns(dns_queries
):
130 connection
= start_db_connection(config
)
131 cursor
= connection
.cursor()
132 ctx
= unbound
.ub_ctx()
133 ctx
.set_fwd(dns_queries
.dns_IP
)
136 for service_id
, service_name
in dns_queries
.services
:
140 sleep(0.4) # throttle between queries
142 print("starting resolution of {} through {}".format(service_name
,
144 query
= single_query(hour
, cursor
, vpn_id
,
145 dns_queries
.dns_id
, service_id
)
147 ctx
.resolve_async(service_name
, query
, resolve_call_back
,
148 unbound
.RR_TYPE_A
, unbound
.RR_CLASS_IN
)
154 connection
= start_db_connection(config
)
155 cursor
= connection
.cursor()
156 planned_queries
= query_planned_queries(cursor
, hour
, vpn_id
)
157 # each thread will make its own connection
162 for dns_queries
in planned_queries
:
163 thread
= Thread(target
= query_dns
, args
= (dns_queries
,))
165 threads
.append(thread
)
167 for thread
in threads
:
171 log('results already exist for vpn {}'.format(vpn_id
))