1 # Test joining as a DC and check the join was done right
3 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 from samba
.tests
.dns_base
import DNSTKeyTest
24 from samba
.join
import DCJoinContext
25 from samba
.dcerpc
import drsuapi
, misc
, dns
26 from samba
.credentials
import Credentials
27 from samba
.provision
import interface_ips_v4
30 def get_logger(name
="subunit"):
31 """Get a logger object."""
33 logger
= logging
.getLogger(name
)
34 logger
.addHandler(logging
.StreamHandler(sys
.stderr
))
38 class JoinTestCase(DNSTKeyTest
):
40 self
.server
= samba
.tests
.env_get_var_value("SERVER")
41 self
.server_ip
= samba
.tests
.env_get_var_value("SERVER_IP")
43 self
.lp
= samba
.tests
.env_loadparm()
44 self
.creds
= self
.get_credentials()
45 self
.netbios_name
= "jointest1"
48 self
.join_ctx
= DCJoinContext(server
=self
.server
, creds
=self
.creds
,
49 lp
=self
.get_loadparm(),
50 netbios_name
=self
.netbios_name
,
51 targetdir
=self
.tempdir
,
52 domain
=None, logger
=logger
,
53 dns_backend
="SAMBA_INTERNAL")
54 self
.join_ctx
.userAccountControl
= (samba
.dsdb
.UF_SERVER_TRUST_ACCOUNT |
55 samba
.dsdb
.UF_TRUSTED_FOR_DELEGATION
)
57 self
.join_ctx
.replica_flags |
= (drsuapi
.DRSUAPI_DRS_WRIT_REP |
58 drsuapi
.DRSUAPI_DRS_FULL_SYNC_IN_PROGRESS
)
59 self
.join_ctx
.domain_replica_flags
= self
.join_ctx
.replica_flags
60 self
.join_ctx
.secure_channel_type
= misc
.SEC_CHAN_BDC
62 self
.join_ctx
.cleanup_old_join()
64 self
.join_ctx
.force_all_ips
= True
66 self
.join_ctx
.do_join()
70 paths
= self
.join_ctx
.paths
71 except AttributeError:
75 shutil
.rmtree(paths
.private_dir
)
76 shutil
.rmtree(paths
.state_dir
)
77 self
.rm_dirs("etc", "msg.lock", "bind-dns")
78 self
.rm_files("names.tdb")
80 self
.join_ctx
.cleanup_old_join(force
=True)
84 def test_join_makes_records(self
):
85 "create a query packet containing one query record via TCP"
86 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
89 name
= self
.join_ctx
.dnshostname
90 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
94 IPs
= interface_ips_v4(self
.lp
, all_interfaces
=True)
96 self
.finish_name_packet(p
, questions
)
97 (response
, response_packet
) = self
.dns_transaction_tcp(p
, host
=self
.server_ip
)
98 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
99 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
100 self
.assertEqual(response
.ancount
, len(IPs
))
103 name
= "%s._msdcs.%s" % (self
.join_ctx
.ntds_guid
, self
.join_ctx
.dnsforest
)
104 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
107 self
.finish_name_packet(p
, questions
)
108 (response
, response_packet
) = self
.dns_transaction_tcp(p
, host
=self
.server_ip
)
109 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
110 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
112 self
.assertEqual(response
.ancount
, 1 + len(IPs
))
113 self
.assertEqual(response
.answers
[0].rr_type
, dns
.DNS_QTYPE_CNAME
)
114 self
.assertEqual(response
.answers
[0].rdata
, self
.join_ctx
.dnshostname
)
115 self
.assertEqual(response
.answers
[1].rr_type
, dns
.DNS_QTYPE_A
)
117 def test_join_records_can_update(self
):
118 dc_creds
= Credentials()
119 dc_creds
.guess(self
.join_ctx
.lp
)
120 dc_creds
.set_machine_account(self
.join_ctx
.lp
)
122 self
.tkey_trans(creds
=dc_creds
)
124 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
125 q
= self
.make_name_question(self
.join_ctx
.dnsdomain
,
130 self
.finish_name_packet(p
, questions
)
133 # Delete the old expected IPs
134 IPs
= interface_ips_v4(self
.lp
, all_interfaces
=True)
138 r
.name
= self
.join_ctx
.dnshostname
139 r
.rr_type
= dns
.DNS_QTYPE_AAAA
140 r
.rr_class
= dns
.DNS_QCLASS_NONE
146 r
.name
= self
.join_ctx
.dnshostname
147 r
.rr_type
= dns
.DNS_QTYPE_A
148 r
.rr_class
= dns
.DNS_QCLASS_NONE
156 p
.nscount
= len(updates
)
159 mac
= self
.sign_packet(p
, self
.tkey
['name'])
160 (response
, response_p
) = self
.dns_transaction_udp(p
, self
.server_ip
)
161 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
162 self
.verify_packet(response
, response_p
, mac
)
164 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
167 name
= self
.join_ctx
.dnshostname
168 q
= self
.make_name_question(name
, dns
.DNS_QTYPE_A
, dns
.DNS_QCLASS_IN
)
171 self
.finish_name_packet(p
, questions
)
172 (response
, response_packet
) = self
.dns_transaction_tcp(p
, host
=self
.server_ip
)
173 self
.assert_dns_rcode_equals(response
, dns
.DNS_RCODE_OK
)
174 self
.assert_dns_opcode_equals(response
, dns
.DNS_OPCODE_QUERY
)
175 self
.assertEqual(response
.ancount
, 1)