1 # Tests of malformed DNS packets
2 # Copyright (C) Catalyst.NET ltd
4 # written by Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 """Sanity tests for DNS and NBT server parsing.
21 We don't use a proper client library so we can make improper packets.
28 from samba
.dcerpc
import dns
, nbt
30 from samba
.tests
import TestCase
35 for i
in range(1, 0xffff):
39 SERVER
= os
.environ
['SERVER_IP']
40 SERVER_NAME
= f
"{os.environ['SERVER']}.{os.environ['REALM']}"
44 def encode_netbios_bytes(chars
):
45 """Even RFC 1002 uses distancing quotes when calling this "compression"."""
47 chars
= (chars
+ b
' ')[:16]
49 out
.append((c
>> 4) + 65)
50 out
.append((c
& 15) + 65)
54 class TestDnsPacketBase(TestCase
):
58 # we need to ensure the DNS server is responsive before
61 ok
= self
._known
_good
_query
()
64 print(f
"the server is STILL unresponsive after {40 * TIMEOUT} seconds")
66 def decode_reply(self
, data
):
68 id, flags
, n_q
, n_a
, n_rec
, n_exta
= struct
.unpack('!6H',
74 def construct_query(self
, names
):
75 """Create a query packet containing one query record.
77 *names* is either a single string name in the usual dotted
78 form, or a list of names. In the latter case, each name can
79 be a dotted string or a list of byte components, which allows
80 dots in components. Where I say list, I mean non-string
85 # these 3 are all the same
88 [[b"example", b"com"]]
90 # this is three names in the same request
92 [b"example", b"com", b"..!"],
93 (b"first component", b" 2nd component")]
95 header
= struct
.pack('!6H',
97 0x0100, # query, with recursion
98 len(names
), # number of queries
101 0x0000, # no extra records
103 tail
= struct
.pack('!BHH',
106 0x0001, # class IN-ternet
110 if isinstance(name
, str):
111 bits
= name
.encode('utf8').split(b
'.')
116 encoded_bits
.append(b
'%c%s' % (len(b
), b
))
117 encoded_bits
.append(tail
)
119 return header
+ b
''.join(encoded_bits
)
121 def _test_query(self
, names
=(), expected_rcode
=None):
123 if isinstance(names
, str):
126 packet
= self
.construct_query(names
)
127 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
)
128 s
.sendto(packet
, self
.server
)
129 r
, _
, _
= select
.select([s
], [], [], TIMEOUT
)
131 # It is reasonable to not reply to these packets (Windows
132 # doesn't), but it is not reasonable to render the server
135 ok
= self
._known
_good
_query
()
136 self
.assertTrue(ok
, "the server is unresponsive")
138 def _known_good_query(self
):
139 if self
.server
[1] == 53:
141 expected_rcode
= dns
.DNS_RCODE_OK
143 name
= [encode_netbios_bytes(b
'nxdomain'), b
'nxdomain']
144 expected_rcode
= nbt
.NBT_RCODE_NAM
146 packet
= self
.construct_query([name
])
147 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
)
148 s
.sendto(packet
, self
.server
)
149 r
, _
, _
= select
.select([s
], [], [], TIMEOUT
)
154 data
, addr
= s
.recvfrom(4096)
156 rcode
= self
.decode_reply(data
)['rcode']
157 return expected_rcode
== rcode
159 def _test_empty_packet(self
):
162 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_DGRAM
)
163 s
.sendto(packet
, self
.server
)
166 # It is reasonable not to reply to an empty packet
167 # but it is not reasonable to render the server
169 ok
= self
._known
_good
_query
()
170 self
.assertTrue(ok
, "the server is unresponsive")
173 class TestDnsPackets(TestDnsPacketBase
):
174 server
= (SERVER
, 53)
175 qtype
= 1 # dns type A
177 def _test_many_repeated_components(self
, label
, n
, expected_rcode
=None):
179 self
._test
_query
([name
],
180 expected_rcode
=expected_rcode
)
182 def test_127_very_dotty_components(self
):
184 self
._test
_many
_repeated
_components
(label
, 127)
186 def test_127_half_dotty_components(self
):
187 label
= b
'x.' * 31 + b
'x'
188 self
._test
_many
_repeated
_components
(label
, 127)
190 def test_empty_packet(self
):
191 self
._test
_empty
_packet
()
194 class TestNbtPackets(TestDnsPacketBase
):
195 server
= (SERVER
, 137)
196 qtype
= 0x20 # NBT_QTYPE_NETBIOS
198 def _test_nbt_encode_query(self
, names
, *args
, **kwargs
):
199 if isinstance(names
, str):
204 if isinstance(name
, str):
205 bits
= name
.encode('utf8').split(b
'.')
209 encoded
= [encode_netbios_bytes(bits
[0])]
210 encoded
.extend(bits
[1:])
211 nbt_names
.append(encoded
)
213 self
._test
_query
(nbt_names
, *args
, **kwargs
)
215 def _test_many_repeated_components(self
, label
, n
, expected_rcode
=None):
217 name
[0] = encode_netbios_bytes(label
)
218 self
._test
_query
([name
],
219 expected_rcode
=expected_rcode
)
221 def test_127_very_dotty_components(self
):
223 self
._test
_many
_repeated
_components
(label
, 127)
225 def test_127_half_dotty_components(self
):
226 label
= b
'x.' * 31 + b
'x'
227 self
._test
_many
_repeated
_components
(label
, 127)
229 def test_empty_packet(self
):
230 self
._test
_empty
_packet
()