1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
3 # Copyright (C) Ralph Boehme <slow@samba.org> 2016
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 import samba
.getopt
as options
22 import samba
.ndr
as ndr
23 from samba
.dcerpc
import dns
24 from samba
.tests
.subunitrun
import SubunitOptions
, TestProgram
25 from samba
.tests
.dns_base
import DNSTKeyTest
27 parser
= optparse
.OptionParser("dns_tkey.py <server name> <server ip> [options]")
28 sambaopts
= options
.SambaOptions(parser
)
29 parser
.add_option_group(sambaopts
)
31 # This timeout only has relevance when testing against Windows
32 # Format errors tend to return patchy responses, so a timeout is needed.
33 parser
.add_option("--timeout", type="int", dest
="timeout",
34 help="Specify timeout for DNS requests")
36 # use command line creds if available
37 credopts
= options
.CredentialsOptions(parser
)
38 parser
.add_option_group(credopts
)
39 subunitopts
= SubunitOptions(parser
)
40 parser
.add_option_group(subunitopts
)
42 opts
, args
= parser
.parse_args()
43 timeout
= opts
.timeout
53 class TestDNSUpdates(DNSTKeyTest
):
55 self
.server
= server_name
56 self
.server_ip
= server_ip
59 def test_tkey_gss_tsig(self
):
60 "test DNS TKEY handshake with gss-tsig"
64 def test_tkey_gss_microsoft_com(self
):
65 "test DNS TKEY handshake with gss.microsoft.com"
67 self
.tkey_trans(algorithm_name
="gss.microsoft.com")
69 def test_tkey_invalid_gss_TSIG(self
):
70 "test DNS TKEY handshake with invalid gss-TSIG"
72 self
.tkey_trans(algorithm_name
="gss-TSIG",
73 expected_rcode
=dns
.DNS_RCODE_REFUSED
)
75 def test_tkey_invalid_gss_MICROSOFT_com(self
):
76 "test DNS TKEY handshake with invalid gss.MICROSOFT.com"
78 self
.tkey_trans(algorithm_name
="gss.MICROSOFT.com",
79 expected_rcode
=dns
.DNS_RCODE_REFUSED
)
81 def test_update_wo_tsig(self
):
82 "test DNS update without TSIG record"
84 p
= self
.make_update_request()
85 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
86 self
.assert_echoed_dns_error(p
, response
, response_p
, dns
.DNS_RCODE_REFUSED
)
88 rcode
= self
.search_record(self
.newrecname
)
89 self
.assert_rcode_equals(rcode
, dns
.DNS_RCODE_NXDOMAIN
)
91 def test_update_tsig_bad_keyname(self
):
92 "test DNS update with a TSIG record with a bad keyname"
96 p
= self
.make_update_request()
97 self
.sign_packet(p
, "badkey")
98 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
99 self
.assert_echoed_dns_error(p
, response
, response_p
, dns
.DNS_RCODE_REFUSED
)
101 rcode
= self
.search_record(self
.newrecname
)
102 self
.assert_rcode_equals(rcode
, dns
.DNS_RCODE_NXDOMAIN
)
104 def test_update_tsig_bad_mac(self
):
105 "test DNS update with a TSIG record with a bad MAC"
109 p
= self
.make_update_request()
110 self
.bad_sign_packet(p
, self
.tkey
['name'])
111 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
112 self
.assert_echoed_dns_error(p
, response
, response_p
, dns
.DNS_RCODE_REFUSED
)
114 rcode
= self
.search_record(self
.newrecname
)
115 self
.assert_rcode_equals(rcode
, dns
.DNS_RCODE_NXDOMAIN
)
117 def test_update_tsig_bad_algorithm(self
):
118 "test DNS update with a TSIG record with a bad algorithm"
122 algorithm_name
= "gss-TSIG"
123 p
= self
.make_update_request()
124 mac
= self
.sign_packet(p
, self
.tkey
['name'],
125 algorithm_name
=algorithm_name
)
126 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
127 self
.assert_echoed_dns_error(p
, response
, response_p
, dns
.DNS_RCODE_REFUSED
)
129 rcode
= self
.search_record(self
.newrecname
)
130 self
.assert_rcode_equals(rcode
, dns
.DNS_RCODE_NXDOMAIN
)
132 def test_update_tsig_changed_algorithm1(self
):
133 "test DNS update with a TSIG record with a changed algorithm"
135 algorithm_name
= "gss-tsig"
136 self
.tkey_trans(algorithm_name
=algorithm_name
)
138 # Now delete the record, it's most likely
139 # a no-op as it should not be there if the test
140 # runs the first time
141 p
= self
.make_update_request(delete
=True)
142 mac
= self
.sign_packet(p
, self
.tkey
['name'], algorithm_name
=algorithm_name
)
143 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
144 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
145 self
.verify_packet(response
, response_p
, mac
)
147 # Now do an update with the algorithm_name
148 # changed in the requests TSIG message.
149 p
= self
.make_update_request()
150 algorithm_name
= "gss.microsoft.com"
151 mac
= self
.sign_packet(p
, self
.tkey
['name'],
152 algorithm_name
=algorithm_name
)
153 algorithm_name
= "gss-tsig"
154 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
,
155 allow_remaining
=True)
156 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
157 self
.verify_packet(response
, response_p
, mac
)
159 # Check the record is around
160 rcode
= self
.search_record(self
.newrecname
)
161 self
.assert_rcode_equals(rcode
, dns
.DNS_RCODE_OK
)
163 # Now delete the record, with the original
164 # algorithm_name used in the tkey exchange
165 p
= self
.make_update_request(delete
=True)
166 mac
= self
.sign_packet(p
, self
.tkey
['name'], algorithm_name
=algorithm_name
)
167 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
168 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
169 self
.verify_packet(response
, response_p
, mac
)
171 rcode
= self
.search_record(self
.newrecname
)
172 self
.assert_rcode_equals(rcode
, dns
.DNS_RCODE_NXDOMAIN
)
174 def test_update_tsig_changed_algorithm2(self
):
175 "test DNS update with a TSIG record with a changed algorithm"
177 algorithm_name
= "gss.microsoft.com"
178 self
.tkey_trans(algorithm_name
=algorithm_name
)
180 # Now delete the record, it's most likely
181 # a no-op as it should not be there if the test
182 # runs the first time
183 p
= self
.make_update_request(delete
=True)
184 mac
= self
.sign_packet(p
, self
.tkey
['name'], algorithm_name
=algorithm_name
)
185 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
186 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
187 self
.verify_packet(response
, response_p
, mac
)
189 # Now do an update with the algorithm_name
190 # changed in the requests TSIG message.
191 p
= self
.make_update_request()
192 algorithm_name
= "gss-tsig"
193 mac
= self
.sign_packet(p
, self
.tkey
['name'],
194 algorithm_name
=algorithm_name
)
195 algorithm_name
= "gss.microsoft.com"
196 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
,
197 allow_truncated
=True)
198 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
199 response_p_pack
= ndr
.ndr_pack(response
)
200 if len(response_p_pack
) == len(response_p
):
201 self
.verify_packet(response
, response_p
, mac
)
205 # Check the record is around
206 rcode
= self
.search_record(self
.newrecname
)
207 self
.assert_rcode_equals(rcode
, dns
.DNS_RCODE_OK
)
209 # Now delete the record, with the original
210 # algorithm_name used in the tkey exchange
211 p
= self
.make_update_request(delete
=True)
212 mac
= self
.sign_packet(p
, self
.tkey
['name'], algorithm_name
=algorithm_name
)
213 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
214 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
215 self
.verify_packet(response
, response_p
, mac
)
217 rcode
= self
.search_record(self
.newrecname
)
218 self
.assert_rcode_equals(rcode
, dns
.DNS_RCODE_NXDOMAIN
)
220 def test_update_gss_tsig_tkey_req_additional(self
):
221 "test DNS update with correct gss-tsig record tkey req in additional"
225 p
= self
.make_update_request()
226 mac
= self
.sign_packet(p
, self
.tkey
['name'])
227 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
228 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
229 self
.verify_packet(response
, response_p
, mac
)
231 # Check the record is around
232 rcode
= self
.search_record(self
.newrecname
)
233 self
.assert_rcode_equals(rcode
, dns
.DNS_RCODE_OK
)
235 # Now delete the record
236 p
= self
.make_update_request(delete
=True)
237 mac
= self
.sign_packet(p
, self
.tkey
['name'])
238 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
239 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
240 self
.verify_packet(response
, response_p
, mac
)
243 rcode
= self
.search_record(self
.newrecname
)
244 self
.assert_rcode_equals(rcode
, dns
.DNS_RCODE_NXDOMAIN
)
246 def test_update_gss_tsig_tkey_req_answers(self
):
247 "test DNS update with correct gss-tsig record tsig req in answers"
249 self
.tkey_trans(tkey_req_in_answers
=True)
251 p
= self
.make_update_request()
252 mac
= self
.sign_packet(p
, self
.tkey
['name'])
253 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
254 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
255 self
.verify_packet(response
, response_p
, mac
)
257 # Check the record is around
258 rcode
= self
.search_record(self
.newrecname
)
259 self
.assert_rcode_equals(rcode
, dns
.DNS_RCODE_OK
)
261 # Now delete the record
262 p
= self
.make_update_request(delete
=True)
263 mac
= self
.sign_packet(p
, self
.tkey
['name'])
264 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
265 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
266 self
.verify_packet(response
, response_p
, mac
)
269 rcode
= self
.search_record(self
.newrecname
)
270 self
.assert_rcode_equals(rcode
, dns
.DNS_RCODE_NXDOMAIN
)
272 def test_update_gss_microsoft_com_tkey_req_additional(self
):
273 "test DNS update with correct gss.microsoft.com record tsig req in additional"
275 algorithm_name
= "gss.microsoft.com"
276 self
.tkey_trans(algorithm_name
=algorithm_name
)
278 p
= self
.make_update_request()
279 mac
= self
.sign_packet(p
, self
.tkey
['name'],
280 algorithm_name
=algorithm_name
)
281 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
282 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
283 self
.verify_packet(response
, response_p
, mac
)
285 # Check the record is around
286 rcode
= self
.search_record(self
.newrecname
)
287 self
.assert_rcode_equals(rcode
, dns
.DNS_RCODE_OK
)
289 # Now delete the record
290 p
= self
.make_update_request(delete
=True)
291 mac
= self
.sign_packet(p
, self
.tkey
['name'],
292 algorithm_name
=algorithm_name
)
293 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
294 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
295 self
.verify_packet(response
, response_p
, mac
)
298 rcode
= self
.search_record(self
.newrecname
)
299 self
.assert_rcode_equals(rcode
, dns
.DNS_RCODE_NXDOMAIN
)
301 def test_update_gss_microsoft_com_tkey_req_answers(self
):
302 "test DNS update with correct gss.microsoft.com record tsig req in answers"
304 algorithm_name
= "gss.microsoft.com"
305 self
.tkey_trans(algorithm_name
=algorithm_name
,
306 tkey_req_in_answers
=True)
308 p
= self
.make_update_request()
309 mac
= self
.sign_packet(p
, self
.tkey
['name'],
310 algorithm_name
=algorithm_name
)
311 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
312 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
313 self
.verify_packet(response
, response_p
, mac
)
315 # Check the record is around
316 rcode
= self
.search_record(self
.newrecname
)
317 self
.assert_rcode_equals(rcode
, dns
.DNS_RCODE_OK
)
319 # Now delete the record
320 p
= self
.make_update_request(delete
=True)
321 mac
= self
.sign_packet(p
, self
.tkey
['name'],
322 algorithm_name
=algorithm_name
)
323 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
324 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
325 self
.verify_packet(response
, response_p
, mac
)
328 rcode
= self
.search_record(self
.newrecname
)
329 self
.assert_rcode_equals(rcode
, dns
.DNS_RCODE_NXDOMAIN
)
331 def test_update_tsig_windows(self
):
332 "test DNS update with correct TSIG record (follow Windows pattern)"
334 p
= self
.make_update_request()
336 rr_class
= dns
.DNS_QCLASS_IN
341 r
.name
= self
.newrecname
342 r
.rr_type
= dns
.DNS_QTYPE_A
343 r
.rr_class
= dns
.DNS_QCLASS_ANY
348 r
.name
= self
.newrecname
349 r
.rr_type
= dns
.DNS_QTYPE_AAAA
350 r
.rr_class
= dns
.DNS_QCLASS_ANY
355 r
.name
= self
.newrecname
356 r
.rr_type
= dns
.DNS_QTYPE_A
357 r
.rr_class
= rr_class
360 r
.rdata
= "10.1.45.64"
362 p
.nscount
= len(updates
)
367 r
.name
= self
.newrecname
368 r
.rr_type
= dns
.DNS_QTYPE_CNAME
369 r
.rr_class
= dns
.DNS_QCLASS_NONE
373 p
.ancount
= len(prereqs
)
376 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
377 self
.assert_echoed_dns_error(p
, response
, response_p
, dns
.DNS_RCODE_REFUSED
)
380 mac
= self
.sign_packet(p
, self
.tkey
['name'])
381 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
382 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
383 self
.verify_packet(response
, response_p
, mac
)
385 # Check the record is around
386 rcode
= self
.search_record(self
.newrecname
)
387 self
.assert_rcode_equals(rcode
, dns
.DNS_RCODE_OK
)
389 # Now delete the record
392 r
.name
= self
.newrecname
393 r
.rr_type
= dns
.DNS_QTYPE_A
394 r
.rr_class
= dns
.DNS_QCLASS_NONE
397 r
.rdata
= "10.1.45.64"
398 delete_updates
.append(r
)
399 p
= self
.make_update_request(delete
=True)
400 p
.nscount
= len(delete_updates
)
401 p
.nsrecs
= delete_updates
402 mac
= self
.sign_packet(p
, self
.tkey
['name'])
403 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
404 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
405 self
.verify_packet(response
, response_p
, mac
)
408 rcode
= self
.search_record(self
.newrecname
)
409 self
.assert_rcode_equals(rcode
, dns
.DNS_RCODE_NXDOMAIN
)
411 def test_update_tsig_record_access_denied(self
):
412 """test DNS update with a TSIG record where the user does not have
413 permissions to change the record"""
418 # First create the record as admin
419 p
= self
.make_update_request()
420 mac
= self
.sign_packet(p
, self
.tkey
['name'])
421 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
422 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
423 self
.verify_packet(response
, response_p
, mac
)
425 # Check the record is around
426 rcode
= self
.search_record(self
.newrecname
)
427 self
.assert_rcode_equals(rcode
, dns
.DNS_RCODE_OK
)
429 # Now update the same values as normal user
430 # should work without error
431 self
.tkey_trans(creds
=self
.get_unpriv_creds())
432 unpriv_tkey
= self
.tkey
434 p
= self
.make_update_request()
435 mac
= self
.sign_packet(p
, self
.tkey
['name'])
436 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
437 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
438 self
.verify_packet(response
, response_p
, mac
)
440 # Check the record is still around
441 rcode
= self
.search_record(self
.newrecname
)
442 self
.assert_rcode_equals(rcode
, dns
.DNS_RCODE_OK
)
444 # Now try to delete the record a normal user (should fail)
445 p
= self
.make_update_request(delete
=True)
446 mac
= self
.sign_packet(p
, self
.tkey
['name'])
447 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
448 self
.assert_echoed_dns_error(p
, response
, response_p
, dns
.DNS_RCODE_REFUSED
)
450 # Check the record is still around
451 rcode
= self
.search_record(self
.newrecname
)
452 self
.assert_rcode_equals(rcode
, dns
.DNS_RCODE_OK
)
454 # Now delete the record as admin
456 p
= self
.make_update_request(delete
=True)
457 mac
= self
.sign_packet(p
, self
.tkey
['name'])
458 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
459 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
460 self
.verify_packet(response
, response_p
, mac
)
463 rcode
= self
.search_record(self
.newrecname
)
464 self
.assert_rcode_equals(rcode
, dns
.DNS_RCODE_NXDOMAIN
)
467 TestProgram(module
=__name__
, opts
=subunitopts
)