1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett 2007
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 from samba
import credentials
20 from samba
.dcerpc
import dns
, dnsserver
21 from samba
.dnsserver
import record_from_string
22 from samba
.tests
.subunitrun
import SubunitOptions
, TestProgram
23 from samba
import werror
, WERRORError
24 from samba
.tests
.dns_base
import DNSTest
25 import samba
.getopt
as options
28 parser
= optparse
.OptionParser(
29 "dns_wildcard.py <server name> <server ip> [options]")
30 sambaopts
= options
.SambaOptions(parser
)
31 parser
.add_option_group(sambaopts
)
33 # This timeout only has relevance when testing against Windows
34 # Format errors tend to return patchy responses, so a timeout is needed.
35 parser
.add_option("--timeout", type="int", dest
="timeout",
36 help="Specify timeout for DNS requests")
38 # To run against Windows
39 # python python/samba/tests/dns_wildcard.py computer_name ip
40 # -U"Administrator%admin_password"
45 # use command line creds if available
46 credopts
= options
.CredentialsOptions(parser
)
47 parser
.add_option_group(credopts
)
48 subunitopts
= SubunitOptions(parser
)
49 parser
.add_option_group(subunitopts
)
51 opts
, args
= parser
.parse_args()
53 lp
= sambaopts
.get_loadparm()
54 creds
= credopts
.get_credentials(lp
)
56 timeout
= opts
.timeout
64 creds
.set_krb_forwardable(credentials
.NO_KRB_FORWARDABLE
)
66 WILDCARD_IP
= "1.1.1.1"
67 WILDCARD
= "*.wildcardtest"
69 EXACT
= "exact.wildcardtest"
70 LEVEL2_WILDCARD_IP
= "1.1.1.3"
71 LEVEL2_WILDCARD
= "*.level2.wildcardtest"
72 LEVEL2_EXACT_IP
= "1.1.1.4"
73 LEVEL2_EXACT
= "exact.level2.wildcardtest"
76 class TestWildCardQueries(DNSTest
):
80 global server
, server_ip
, lp
, creds
, timeout
81 self
.server
= server_name
82 self
.server_ip
= server_ip
85 self
.timeout
= timeout
87 # Create the dns records
88 self
.dns_records
= [(dns
.DNS_QTYPE_A
,
89 "%s.%s" % (WILDCARD
, self
.get_dns_domain()),
92 "%s.%s" % (EXACT
, self
.get_dns_domain()),
97 self
.get_dns_domain())),
102 self
.get_dns_domain())),
105 c
= self
.dns_connect()
106 for (typ
, name
, data
) in self
.dns_records
:
107 self
.add_record(c
, typ
, name
, data
)
110 c
= self
.dns_connect()
111 for (typ
, name
, data
) in self
.dns_records
:
112 self
.delete_record(c
, typ
, name
, data
)
114 def dns_connect(self
):
115 binding_str
= "ncacn_ip_tcp:%s[sign]" % self
.server_ip
116 return dnsserver
.dnsserver(binding_str
, self
.lp
, self
.creds
)
118 def delete_record(self
, dns_conn
, typ
, name
, data
):
120 rec
= record_from_string(typ
, data
)
121 del_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
122 del_rec_buf
.rec
= rec
125 dns_conn
.DnssrvUpdateRecord2(dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
128 self
.get_dns_domain(),
132 except WERRORError
as e
:
133 # Ignore record does not exist errors
134 if e
.args
[0] != werror
.WERR_DNS_ERROR_NAME_DOES_NOT_EXIST
:
137 def add_record(self
, dns_conn
, typ
, name
, data
):
139 rec
= record_from_string(typ
, data
)
140 add_rec_buf
= dnsserver
.DNS_RPC_RECORD_BUF()
141 add_rec_buf
.rec
= rec
143 dns_conn
.DnssrvUpdateRecord2(dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
146 self
.get_dns_domain(),
150 except WERRORError
as e
:
153 def test_one_a_query_match_wildcard(self
):
154 "Query an A record, should match the wildcard entry"
156 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
160 name
= "miss.wildcardtest.%s" % self
.get_dns_domain()
161 q
= self
.make_name_question(name
,
166 self
.finish_name_packet(p
, questions
)
167 (response
, response_packet
) =\
168 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
169 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
170 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
171 self
.assertEqual(response
.ancount
, 1)
172 self
.assertEqual(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_A
)
173 self
.assertEqual(response
.answers
[0].rdata
, WILDCARD_IP
)
175 def test_one_a_query_match_wildcard_2_labels(self
):
176 """ Query an A record, should match the wild card entry
177 have two labels to the left of the wild card target.
180 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
184 name
= "label2.label1.wildcardtest.%s" % self
.get_dns_domain()
185 q
= self
.make_name_question(name
,
190 self
.finish_name_packet(p
, questions
)
191 (response
, response_packet
) =\
192 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
193 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
194 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
195 self
.assertEqual(response
.ancount
, 1)
196 self
.assertEqual(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_A
)
197 self
.assertEqual(response
.answers
[0].rdata
, WILDCARD_IP
)
199 def test_one_a_query_wildcard_entry(self
):
200 "Query the wildcard entry"
202 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
206 name
= "%s.%s" % (WILDCARD
, self
.get_dns_domain())
207 q
= self
.make_name_question(name
,
212 self
.finish_name_packet(p
, questions
)
213 (response
, response_packet
) =\
214 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
215 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
216 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
217 self
.assertEqual(response
.ancount
, 1)
218 self
.assertEqual(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_A
)
219 self
.assertEqual(response
.answers
[0].rdata
, WILDCARD_IP
)
221 def test_one_a_query_exact_match(self
):
222 """Query an entry that matches the wild card but has an exact match as
225 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
229 name
= "%s.%s" % (EXACT
, self
.get_dns_domain())
230 q
= self
.make_name_question(name
,
235 self
.finish_name_packet(p
, questions
)
236 (response
, response_packet
) =\
237 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
238 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
239 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
240 self
.assertEqual(response
.ancount
, 1)
241 self
.assertEqual(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_A
)
242 self
.assertEqual(response
.answers
[0].rdata
, EXACT_IP
)
244 def test_one_a_query_match_wildcard_l2(self
):
245 "Query an A record, should match the level 2 wildcard entry"
247 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
251 name
= "miss.level2.wildcardtest.%s" % self
.get_dns_domain()
252 q
= self
.make_name_question(name
,
257 self
.finish_name_packet(p
, questions
)
258 (response
, response_packet
) =\
259 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
260 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
261 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
262 self
.assertEqual(response
.ancount
, 1)
263 self
.assertEqual(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_A
)
264 self
.assertEqual(response
.answers
[0].rdata
, LEVEL2_WILDCARD_IP
)
266 def test_one_a_query_match_wildcard_l2_2_labels(self
):
267 """Query an A record, should match the level 2 wild card entry
268 have two labels to the left of the wild card target
271 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
275 name
= "label1.label2.level2.wildcardtest.%s" % self
.get_dns_domain()
276 q
= self
.make_name_question(name
,
281 self
.finish_name_packet(p
, questions
)
282 (response
, response_packet
) =\
283 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
284 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
285 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
286 self
.assertEqual(response
.ancount
, 1)
287 self
.assertEqual(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_A
)
288 self
.assertEqual(response
.answers
[0].rdata
, LEVEL2_WILDCARD_IP
)
290 def test_one_a_query_exact_match_l2(self
):
291 """Query an entry that matches the wild card but has an exact match as
294 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
298 name
= "%s.%s" % (LEVEL2_EXACT
, self
.get_dns_domain())
299 q
= self
.make_name_question(name
,
304 self
.finish_name_packet(p
, questions
)
305 (response
, response_packet
) =\
306 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
307 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
308 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
309 self
.assertEqual(response
.ancount
, 1)
310 self
.assertEqual(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_A
)
311 self
.assertEqual(response
.answers
[0].rdata
, LEVEL2_EXACT_IP
)
313 def test_one_a_query_wildcard_entry_l2(self
):
314 "Query the level 2 wildcard entry"
316 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
320 name
= "%s.%s" % (LEVEL2_WILDCARD
, self
.get_dns_domain())
321 q
= self
.make_name_question(name
,
326 self
.finish_name_packet(p
, questions
)
327 (response
, response_packet
) =\
328 self
.dns_transaction_udp(p
, host
=self
.server_ip
)
329 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
330 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
331 self
.assertEqual(response
.ancount
, 1)
332 self
.assertEqual(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_A
)
333 self
.assertEqual(response
.answers
[0].rdata
, LEVEL2_WILDCARD_IP
)
336 TestProgram(module
=__name__
, opts
=subunitopts
)