1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
3 # Copyright (C) Catalyst.NET 2021
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 from samba
import dsdb
21 from samba
import dsdb_dns
22 from samba
.ndr
import ndr_unpack
, ndr_pack
23 from samba
.samdb
import SamDB
24 from samba
.auth
import system_session
26 from samba
import credentials
27 from samba
.dcerpc
import dns
, dnsp
, dnsserver
28 from samba
.dnsserver
import TXTRecord
, ARecord
29 from samba
.dnsserver
import ipv6_normalise
30 from samba
.tests
.subunitrun
import SubunitOptions
, TestProgram
31 from samba
import werror
, WERRORError
32 from samba
.tests
.dns_base
import DNSTest
33 import samba
.getopt
as options
36 from samba
.colour
import c_RED
, c_GREEN
, c_DARK_YELLOW
38 parser
= optparse
.OptionParser(
39 "dns_aging.py <server name> <server ip> [options]")
40 sambaopts
= options
.SambaOptions(parser
)
41 parser
.add_option_group(sambaopts
)
44 # use command line creds if available
45 credopts
= options
.CredentialsOptions(parser
)
46 parser
.add_option_group(credopts
)
47 subunitopts
= SubunitOptions(parser
)
48 parser
.add_option_group(subunitopts
)
50 opts
, args
= parser
.parse_args()
55 LP
= sambaopts
.get_loadparm()
56 CREDS
= credopts
.get_credentials(LP
)
59 CREDS
.set_krb_forwardable(credentials
.NO_KRB_FORWARDABLE
)
61 DOMAIN
= CREDS
.get_realm().lower()
63 # Unix time start, in DNS timestamp (24 * 365.25 * 369)
64 # These are ballpark extremes for the timestamp.
65 DNS_TIMESTAMP_1970
= 3234654
66 DNS_TIMESTAMP_2101
= 4383000
67 DNS_TIMESTAMP_1981
= 3333333 # a middling timestamp
69 IPv4_ADDR
= "127.0.0.33"
71 IPv4_ADDR_2
= "127.0.0.66"
76 return SamDB(url
=f
"ldap://{SERVER_IP}",
78 session_info
=system_session(),
83 # For Samba only direct file access, needed for the tombstoning functions.
84 # (For Windows, we instruct it to tombstone over RPC).
85 return SamDB(url
=LP
.samdb_url(),
87 session_info
=system_session(),
92 return dnsserver
.dnsserver(f
"ncacn_ip_tcp:{SERVER_IP}[sign]", LP
, CREDS
)
95 def create_zone(name
, rpc
=None, aging
=True):
98 z
= dnsserver
.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
100 z
.dwZoneType
= dnsp
.DNS_ZONE_TYPE_PRIMARY
101 z
.fAging
= int(bool(aging
))
102 z
.dwDpFlags
= dnsserver
.DNS_DP_DOMAIN_DEFAULT
105 z
.fAllowUpdate
= dnsp
.DNS_ZONE_UPDATE_UNSECURE
106 rpc
.DnssrvOperation2(dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
112 dnsserver
.DNSSRV_TYPEID_ZONE_CREATE
,
116 def delete_zone(name
, rpc
=None):
119 rpc
.DnssrvOperation2(dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
125 dnsserver
.DNSSRV_TYPEID_NULL
,
130 """Construct a txt record string list, which is a fiddly matter."""
131 if isinstance(txt
, str):
133 s_list
= dnsp
.string_list()
134 s_list
.count
= len(txt
)
139 def make_txt_record(txt
):
141 r
.txt
= txt_s_list(txt
)
146 copy
= dnsserver
.DNS_RPC_RECORD()
147 copy
.wType
= rec
.wType
148 copy
.dwFlags
= rec
.dwFlags
149 copy
.dwSerial
= rec
.dwSerial
150 copy
.dwTtlSeconds
= rec
.dwTtlSeconds
152 copy
.dwTimeStamp
= rec
.dwTimeStamp
156 def guess_wtype(data
):
157 if isinstance(data
, list):
158 data
= make_txt_record(data
)
159 return (data
, dnsp
.DNS_TYPE_TXT
)
161 return (data
, dnsp
.DNS_TYPE_AAAA
)
162 return (data
, dnsp
.DNS_TYPE_A
)
165 class TestDNSAging(DNSTest
):
166 """Probe DNS aging and scavenging, using LDAP and RPC to set and test
167 the timestamps behind DNS's back."""
169 server_ip
= SERVER_IP
174 self
.rpc_conn
= get_rpc()
175 self
.samdb
= get_samdb()
177 # We always have a zone of our own named after the test function.
178 self
.zone
= self
.id().rsplit('.', 1)[1]
179 self
.addCleanup(delete_zone
, self
.zone
, self
.rpc_conn
)
181 create_zone(self
.zone
, self
.rpc_conn
)
182 except WERRORError
as e
:
183 if e
.args
[0] != werror
.WERR_DNS_ERROR_ZONE_ALREADY_EXISTS
:
185 print(f
"zone {self.zone} already exists")
187 # Though we set this in create_zone(), that doesn't work on
188 # Windows, so we repeat again here.
189 self
.set_zone_int_params(AllowUpdate
=dnsp
.DNS_ZONE_UPDATE_UNSECURE
)
191 self
.zone_dn
= (f
"DC={self.zone},CN=MicrosoftDNS,DC=DomainDNSZones,"
192 f
"{self.samdb.get_default_basedn()}")
194 def set_zone_int_params(self
, zone
=None, **kwargs
):
195 """Keyword arguments set parameters on the zone. e.g.:
197 self.set_zone_int_params(Aging=1,
200 See [MS-DNSP] 3.1.1.2.1 "DNS Zone Integer Properties" for names.
204 for key
, val
in kwargs
.items():
205 name_param
= dnsserver
.DNS_RPC_NAME_AND_PARAM()
206 name_param
.dwParam
= val
207 name_param
.pszNodeName
= key
209 self
.rpc_conn
.DnssrvOperation2(
210 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
215 'ResetDwordProperty',
216 dnsserver
.DNSSRV_TYPEID_NAME_AND_PARAM
,
218 except WERRORError
as e
:
221 def rpc_replace(self
, name
, old
=None, new
=None):
222 """Replace a DNS_RPC_RECORD or DNS_RPC_RECORD_BUF"""
223 # wrap our recs, if necessary
224 if isinstance(new
, dnsserver
.DNS_RPC_RECORD
):
226 new
= dnsserver
.DNS_RPC_RECORD_BUF()
229 if isinstance(old
, dnsserver
.DNS_RPC_RECORD
):
231 old
= dnsserver
.DNS_RPC_RECORD_BUF()
235 self
.rpc_conn
.DnssrvUpdateRecord2(
236 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
243 except WERRORError
as e
:
244 self
.fail(f
"could not replace record ({e})")
246 def get_unique_txt_record(self
, name
, txt
):
247 """Get the TXT record on Name with value txt, asserting that there is
249 if isinstance(txt
, str):
251 recs
= self
.ldap_get_records(name
)
255 if r
.wType
!= dnsp
.DNS_TYPE_TXT
:
257 txt2
= [x
for x
in r
.data
.str]
259 self
.assertIsNone(match
)
263 def get_unique_ip_record(self
, name
, addr
, wtype
=None):
264 """Get an A or AAAA record on name with the matching data."""
266 addr
, wtype
= guess_wtype(addr
)
268 recs
= self
.ldap_get_records(name
)
270 # We need to use the internal dns_record_match because not all
271 # forms always match on strings (e.g. IPv6)
272 rec
= dnsp
.DnssrvRpcRecord()
278 if dsdb_dns
.records_match(r
, rec
):
279 self
.assertIsNone(match
)
283 def dns_query(self
, name
, qtype
=dns
.DNS_QTYPE_ALL
):
284 """make a query, which might help Windows notice LDAP changes"""
285 p
= self
.make_name_packet(dns
.DNS_OPCODE_QUERY
)
286 fullname
= "%s.%s" % (name
, self
.zone
)
287 q
= self
.make_name_question(fullname
, qtype
, dns
.DNS_QCLASS_IN
)
288 self
.finish_name_packet(p
, [q
])
289 r
, rp
= self
.dns_transaction_udp(p
, host
=SERVER_IP
)
293 def dns_update_non_text(self
, name
,
296 qclass
=dns
.DNS_QCLASS_IN
):
298 data
, wtype
= guess_wtype(data
)
300 if qclass
== dns
.DNS_QCLASS_IN
:
305 fullname
= "%s.%s" % (name
, self
.zone
)
306 p
= self
.make_name_packet(dns
.DNS_OPCODE_UPDATE
)
307 u
= self
.make_name_question(self
.zone
,
310 self
.finish_name_packet(p
, [u
])
326 (code
, response
) = self
.dns_transaction_udp(p
, host
=SERVER_IP
)
327 self
.assert_dns_rcode_equals(code
, dns
.DNS_RCODE_OK
)
330 def dns_delete(self
, name
, data
, wtype
=None):
331 return self
.dns_update_non_text(name
,
334 qclass
=dns
.DNS_QCLASS_NONE
)
336 def dns_delete_type(self
, name
, wtype
):
337 return self
.dns_update_non_text(name
,
340 qclass
=dns
.DNS_QCLASS_ANY
)
342 def dns_update_record(self
, name
, txt
, ttl
=900):
343 if isinstance(txt
, str):
345 p
= self
.make_txt_update(name
, txt
, self
.zone
, ttl
=ttl
)
346 (code
, response
) = self
.dns_transaction_udp(p
, host
=SERVER_IP
)
347 if code
.operation
& dns
.DNS_RCODE
== dns
.DNS_RCODE_REFUSED
:
348 # sometimes you might forget this
349 print("\n\ngot DNS_RCODE_REFUSED\n")
350 print("Are you running this in the fl2003 environment?\n")
351 print("try `SELFTEST_TESTENV='fl2003dc:local' make testenv`\n\n")
353 self
.assert_dns_rcode_equals(code
, dns
.DNS_RCODE_OK
)
354 return self
.get_unique_txt_record(name
, txt
)
356 def rpc_update_record(self
, name
, txt
, **kwargs
):
357 """Add the record that self.dns_update_record() would add, via the
360 As with DNS update, if the record already exists, we replace it.
362 if isinstance(txt
, str):
367 for k
, v
in kwargs
.items():
371 self
.rpc_replace(name
, old
, rec
)
372 except AssertionError as e
:
373 # we have caught and wrapped the WERRor inside
374 if 'WERR_DNS_ERROR_RECORD_DOES_NOT_EXIST' not in str(e
):
376 self
.rpc_replace(name
, None, rec
)
378 return self
.get_unique_txt_record(name
, txt
)
380 def rpc_delete_txt(self
, name
, txt
):
381 if isinstance(txt
, str):
384 self
.rpc_replace(name
, old
, None)
386 def get_one_node(self
, name
):
387 self
.assertIsInstance(name
, str)
388 expr
= f
"(&(objectClass=dnsNode)(name={name}))"
389 nodes
= self
.samdb
.search(base
=self
.zone_dn
,
390 scope
=ldb
.SCOPE_SUBTREE
,
392 attrs
=["dnsRecord", "dNSTombstoned", "name"])
396 f
"expected 0 or 1 dnsNodes for {name}, found {len(nodes)}")
402 def ldap_get_records(self
, name
):
403 node
= self
.get_one_node(name
)
407 records
= node
.get('dnsRecord')
408 return [ndr_unpack(dnsp
.DnssrvRpcRecord
, r
) for r
in records
]
410 def ldap_get_non_tombstoned_records(self
, name
):
411 all_records
= self
.ldap_get_records(name
)
413 for r
in all_records
:
414 if r
.wType
!= dnsp
.DNS_TYPE_TOMBSTONE
:
418 def assert_tombstoned(self
, name
, tombstoned
=True, timestamp
=None):
419 # If run with tombstoned=False, assert it isn't tombstoned
420 # (and has no traces of tombstone). Otherwise assert it has
421 # all the necessary bits.
423 # with timestamp=<non-zero number of hours>, we assert that
424 # the nttime timestamp is about that time.
426 # with timestamp=None, we assert it is within a century or so.
428 # with timestamp=False (or 0), we don't assert on it.
430 node
= self
.get_one_node(name
)
432 self
.fail(f
"no node named {name}")
434 dnsts
= node
.get("dNSTombstoned")
436 is_tombstoned
= False
438 self
.assertEqual(len(dnsts
), 1)
439 if dnsts
[0] == b
'TRUE':
442 is_tombstoned
= False
444 if tombstoned
!= is_tombstoned
:
446 self
.fail(f
"{name} is tombstoned")
448 self
.fail(f
"{name} is not tombstoned")
450 recs
= self
.ldap_get_records(name
)
452 self
.assertEqual(len(recs
), 1)
453 self
.assertEqual(recs
[0].wType
, dnsp
.DNS_TYPE_TOMBSTONE
)
454 if timestamp
is None:
455 self
.assert_nttime_in_hour_range(recs
[0].data
)
457 self
.assert_nttime_in_hour_range(recs
[0].data
,
463 self
.assertNotEqual(recs
[0].wType
, dnsp
.DNS_TYPE_TOMBSTONE
)
465 def ldap_replace_records(self
, name
, records
):
466 # We use raw ldap to avoid the "helpfulness" of dsdb_dns.replace()
468 dn
= f
'DC={name},{self.zone_dn}'
470 msg
= ldb
.Message
.from_dict(self
.samdb
,
472 'dnsRecord': [ndr_pack(r
) for r
in records
]
474 ldb
.FLAG_MOD_REPLACE
)
477 self
.samdb
.modify(msg
)
478 except ldb
.LdbError
as e
:
479 if 'LDAP_NO_SUCH_OBJECT' not in e
.args
[1]:
481 # We need to do an add
482 msg
["objectClass"] = ["top", "dnsNode"]
483 msg
["dnsRecord"].set_flags(ldb
.FLAG_MOD_ADD
)
486 def ldap_update_core(self
, name
, wtype
, data
, **kwargs
):
487 """This one is not TXT specific."""
488 records
= self
.ldap_get_records(name
)
491 rec
= dnsp
.DnssrvRpcRecord()
493 rec
.rank
= dnsp
.DNS_RANK_ZONE
494 rec
.dwTtlSeconds
= 900
499 # override defaults, as required
500 for k
, v
in kwargs
.items():
503 for i
, r
in enumerate(records
[:]):
504 if dsdb_dns
.records_match(r
, rec
):
507 else: # record not found
510 self
.ldap_replace_records(name
, records
)
513 def ldap_update_record(self
, name
, txt
, **kwargs
):
514 """Add the record that self.dns_update_record() would add, via ldap,
515 thus allowing us to set additional dnsRecord features like
518 rec
= self
.ldap_update_core(name
,
523 recs
= self
.ldap_get_records(name
)
526 if r
.wType
!= rec
.wType
:
528 if r
.data
.str == rec
.data
.str:
529 self
.assertIsNone(match
, f
"duplicate records for {name}")
531 self
.assertEqual(match
.rank
, rec
.rank
& 255)
532 self
.assertEqual(match
.dwTtlSeconds
, rec
.dwTtlSeconds
)
533 self
.assert_timestamps_equal(match
.dwTimeStamp
, rec
.dwTimeStamp
)
536 def ldap_delete_record(self
, name
, data
, wtype
=dnsp
.DNS_TYPE_TXT
):
537 rec
= dnsp
.DnssrvRpcRecord()
538 if wtype
== dnsp
.DNS_TYPE_TXT
:
539 data
= txt_s_list(data
)
543 records
= self
.ldap_get_records(name
)
544 for i
, r
in enumerate(records
[:]):
545 if dsdb_dns
.records_match(r
, rec
):
549 self
.fail(f
"record {data} not found")
551 self
.ldap_replace_records(name
, records
)
553 def add_ip_record(self
, name
, addr
, wtype
=None, **kwargs
):
555 addr
, wtype
= guess_wtype(addr
)
556 rec
= self
.ldap_update_core(name
,
561 recs
= self
.ldap_get_records(name
)
564 if dsdb_dns
.records_match(r
, rec
):
565 self
.assertIsNone(match
, f
"duplicate records for {name}")
567 self
.assertEqual(match
.rank
, rec
.rank
& 255)
568 self
.assertEqual(match
.dwTtlSeconds
, rec
.dwTtlSeconds
)
569 self
.assert_timestamps_equal(match
.dwTimeStamp
, rec
.dwTimeStamp
)
572 def ldap_modify_timestamps(self
, name
, delta
):
573 records
= self
.ldap_get_records(name
)
575 rec
.dwTimeStamp
+= delta
576 self
.ldap_replace_records(name
, records
)
578 def get_rpc_records(self
, name
, dns_type
=None):
580 dns_type
= dnsp
.DNS_TYPE_ALL
581 select_flags
= dnsserver
.DNS_RPC_VIEW_AUTHORITY_DATA
582 buflen
, res
= self
.rpc_conn
.DnssrvEnumRecords2(
583 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
594 if not res
or res
.count
== 0:
597 recs
.extend(rec
.records
)
600 def dns_tombstone(self
, name
,
601 epoch_hours
=DNS_TIMESTAMP_1981
,
603 dn
= f
'DC={name},{self.zone_dn}'
604 r
= dnsp
.DnssrvRpcRecord()
605 r
.wType
= dnsp
.DNS_TYPE_TOMBSTONE
606 # r.dwTimeStamp is a 32 bit value in hours, and r.data is an
607 # NTTIME (100 nanosecond intervals), both in the 1601 epoch. A
608 # tombstone will have both, but expiration calculations use
609 # the r.data NTTIME EntombedTime timestamp (see [MS-DNSP]).
610 r
.dwTimeStamp
= epoch_hours
611 if epoch_nttime
is None:
612 r
.data
= epoch_hours
* 3600 * 10 * 1000 * 1000
614 r
.data
= epoch_nttime
616 msg
= ldb
.Message
.from_dict(self
.samdb
,
618 'dnsRecord': [ndr_pack(r
)],
619 'dnsTombstoned': 'TRUE'
621 ldb
.FLAG_MOD_REPLACE
)
623 self
.samdb
.modify(msg
)
624 except ldb
.LdbError
as e
:
625 if 'LDAP_NO_SUCH_OBJECT' not in e
.args
[1]:
627 # We need to do an add
628 msg
["objectClass"] = ["top", "dnsNode"]
631 def set_aging(self
, enable
=False):
632 self
.set_zone_int_params(Aging
=int(bool(enable
)))
634 def assert_timestamp_in_ballpark(self
, rec
):
635 self
.assertGreater(rec
.dwTimeStamp
, DNS_TIMESTAMP_1970
)
636 self
.assertLess(rec
.dwTimeStamp
, DNS_TIMESTAMP_2101
)
638 def assert_nttime_in_hour_range(self
, t
,
639 hour_min
=DNS_TIMESTAMP_1970
,
640 hour_max
=DNS_TIMESTAMP_2101
):
641 t
//= int(3600 * 1e7
)
642 self
.assertGreater(t
, hour_min
)
643 self
.assertLess(t
, hour_max
)
645 def assert_soon_after(self
, timestamp
, reference
):
646 """Assert that a timestamp is the same or very slightly higher than a
649 Typically we expect the timestamps to be identical, unless an
650 hour has clicked over since the reference was taken. However
651 we allow one more hour in case it happens during a daylight
652 savings transition or something.
654 if hasattr(timestamp
, 'dwTimeStamp'):
655 timestamp
= timestamp
.dwTimeStamp
656 if hasattr(reference
, 'dwTimeStamp'):
657 reference
= reference
.dwTimeStamp
659 diff
= timestamp
- reference
660 days
= abs(diff
/ 24.0)
663 msg
= f
"timestamp is {days} days ({abs(diff)} hours) before reference"
665 msg
= f
"timestamp is {days} days ({diff} hours) after reference"
668 raise AssertionError(msg
)
670 def assert_timestamps_equal(self
, ts1
, ts2
):
671 """Just like assertEqual(), but tells us the difference, not the
672 absolute values. e.g:
674 self.assertEqual(a, b)
675 AssertionError: 3685491 != 3685371
677 self.assert_timestamps_equal(a, b)
678 AssertionError: -120 (first is 5.0 days earlier than second)
680 Also, we turn a record into a timestamp if we need
682 if hasattr(ts1
, 'dwTimeStamp'):
683 ts1
= ts1
.dwTimeStamp
684 if hasattr(ts2
, 'dwTimeStamp'):
685 ts2
= ts2
.dwTimeStamp
691 days
= abs(diff
/ 24.0)
692 if ts1
== 0 or ts2
== 0:
693 # when comparing to zero we don't want the number of days.
694 msg
= f
"timestamp {ts1} != {ts2}"
696 msg
= f
"{ts1} is {days} days ({diff} hours) after {ts2}"
698 msg
= f
"{ts1} is {days} days ({abs(diff)} hours) before {ts2}"
700 raise AssertionError(msg
)
702 def test_update_timestamps_aging_off_then_on(self
):
703 # we will add a record with aging off
704 # it will have the current timestamp
705 self
.set_aging(False)
706 name
= 'timestamp-now'
707 name2
= 'timestamp-eightdays'
709 rec
= self
.dns_update_record(name
, [name
])
710 start_time
= rec
.dwTimeStamp
711 self
.assert_timestamp_in_ballpark(rec
)
712 # alter the timestamp -8 days using RPC
713 # with aging turned off, we expect no change
714 # when aging is on, we expect change
715 eight_days_ago
= start_time
- 8 * 24
716 rec
= self
.ldap_update_record(name2
, [name2
],
717 dwTimeStamp
=eight_days_ago
)
719 self
.assert_timestamps_equal(rec
.dwTimeStamp
, eight_days_ago
)
721 # if aging was on, this would change
722 rec
= self
.dns_update_record(name2
, [name2
])
723 self
.assert_timestamps_equal(rec
.dwTimeStamp
, eight_days_ago
)
726 rec
= self
.dns_update_record(name2
, [name2
])
727 self
.assertGreaterEqual(rec
.dwTimeStamp
, start_time
)
729 def test_rpc_update_timestamps(self
):
730 # RPC always sets timestamps to zero on Windows.
731 self
.set_aging(False)
732 name
= 'timestamp-now'
734 rec
= self
.dns_update_record(name
, [name
])
735 start_time
= rec
.dwTimeStamp
736 self
.assert_timestamp_in_ballpark(rec
)
737 # attempt to alter the timestamp to something close by.
738 eight_days_ago
= start_time
- 8 * 24
739 rec
= self
.rpc_update_record(name
, [name
],
740 dwTimeStamp
=eight_days_ago
)
741 self
.assertEqual(rec
.dwTimeStamp
, 0)
743 # try again, with aging on
745 rec
= self
.rpc_update_record(name
, [name
],
746 dwTimeStamp
=eight_days_ago
)
747 self
.assertEqual(rec
.dwTimeStamp
, 0)
749 # now that the record is static, a dns update won't change it
750 rec
= self
.dns_update_record(name
, [name
])
751 self
.assertEqual(rec
.dwTimeStamp
, 0)
753 # but another record on the same node will behave normally
754 # i.e. the node is not static, the record is.
755 name2
= 'timestamp-eightdays'
756 rec
= self
.dns_update_record(name2
, [name2
])
757 self
.assert_soon_after(rec
.dwTimeStamp
,
760 def get_txt_timestamps(self
, name
, *txts
):
761 records
= self
.ldap_get_records(name
)
766 t2
= [x
for x
in r
.data
.str]
768 ret
.append(r
.dwTimeStamp
)
771 def test_update_aging_disabled_2(self
):
772 # With aging disabled, Windows updates the timestamps of all
773 # records when one is updated.
776 txt2
= ['test', 'txt2']
777 txt3
= ['test', 'txt3']
779 self
.set_aging(False)
781 current_time
= self
.dns_update_record(name
, txt1
).dwTimeStamp
783 six_days_ago
= current_time
- 6 * 24
784 eight_days_ago
= current_time
- 8 * 24
785 fifteen_days_ago
= current_time
- 15 * 24
786 hundred_days_ago
= current_time
- 100 * 24
787 thousand_days_ago
= current_time
- 1000 * 24
789 for timestamp
in (current_time
,
796 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=timestamp
)
797 self
.assertEqual(self
.get_txt_timestamps(name
, txt1
), [timestamp
])
800 update_timestamp
= self
.dns_update_record(name
, txt1
).dwTimeStamp
801 self
.assert_timestamps_equal(update_timestamp
, timestamp
)
803 # adding a fresh record
804 for timestamp
in (current_time
,
813 timestamp1
= self
.ldap_update_record(
816 dwTimeStamp
=timestamp
).dwTimeStamp
817 self
.assert_timestamps_equal(timestamp1
, timestamp
)
819 self
.dns_update_record(name
, txt2
)
820 timestamps
= self
.get_txt_timestamps(name
, txt1
, txt2
)
821 self
.assertEqual(timestamps
, [timestamp
, current_time
])
823 self
.ldap_delete_record(name
, txt2
)
824 timestamps
= self
.get_txt_timestamps(name
, txt1
)
825 self
.assertEqual(timestamps
, [timestamp
])
828 timestamp2
= self
.dns_update_record(name
, txt2
).dwTimeStamp
829 self
.assert_soon_after(timestamp2
, current_time
)
831 for timestamp
in (current_time
,
840 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=timestamp
)
841 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
842 self
.assert_timestamps_equal(timestamp1
, timestamp
)
844 timestamp2
= self
.dns_update_record(name
, txt2
).dwTimeStamp
845 # txt1 timestamp is now current time
846 timestamps
= self
.get_txt_timestamps(name
, txt1
, txt2
)
847 self
.assertEqual(timestamps
, [timestamp
, current_time
])
849 # with 3 records, no change
850 for timestamp
in (current_time
,
859 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=timestamp
)
860 self
.ldap_update_record(name
, txt2
, dwTimeStamp
=timestamp
)
861 self
.ldap_update_record(name
, txt3
, dwTimeStamp
=(timestamp
+ 30))
862 timestamp3
= self
.get_unique_txt_record(name
, txt3
).dwTimeStamp
863 self
.assert_timestamps_equal(timestamp3
, timestamp
+ 30)
865 self
.dns_update_record(name
, txt2
).dwTimeStamp
866 timestamps
= self
.get_txt_timestamps(name
, txt1
, txt2
, txt3
)
867 self
.assertEqual(timestamps
, [timestamp
,
871 # with 3 records, one of which is static
872 # first we set the updatee's timestamp to a recognisable number
873 self
.ldap_update_record(name
, txt2
, dwTimeStamp
=999999)
874 for timestamp
in (current_time
,
883 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=0)
884 self
.ldap_update_record(name
, txt3
, dwTimeStamp
=(timestamp
- 9))
885 timestamp3
= self
.get_unique_txt_record(name
, txt3
).dwTimeStamp
886 self
.assert_timestamps_equal(timestamp3
, timestamp
- 9)
888 self
.dns_update_record(name
, txt2
)
889 timestamps
= self
.get_txt_timestamps(name
, txt1
, txt2
, txt3
)
890 self
.assertEqual(timestamps
, [0,
894 # with 3 records, updating one which is static
895 timestamp3
= self
.dns_update_record(name
, txt3
).dwTimeStamp
896 for timestamp
in (current_time
,
905 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=0)
906 self
.ldap_update_record(name
, txt2
, dwTimeStamp
=0)
907 self
.ldap_update_record(name
, txt3
, dwTimeStamp
=(timestamp
+ 30))
908 timestamp3
= self
.get_unique_txt_record(name
, txt3
).dwTimeStamp
909 self
.assert_timestamps_equal(timestamp3
, timestamp
+ 30)
911 self
.dns_update_record(name
, txt2
).dwTimeStamp
912 timestamps
= self
.get_txt_timestamps(name
, txt1
, txt2
, txt3
)
913 self
.assertEqual(timestamps
, [0,
917 # with 3 records, after the static nodes have been replaced
918 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=777777)
919 self
.ldap_update_record(name
, txt2
, dwTimeStamp
=888888)
920 timestamp3
= self
.dns_update_record(name
, txt3
).dwTimeStamp
921 for timestamp
in (current_time
,
930 self
.ldap_update_record(name
, txt3
, dwTimeStamp
=(timestamp
))
931 timestamp3
= self
.get_unique_txt_record(name
, txt3
).dwTimeStamp
932 self
.assert_timestamps_equal(timestamp3
, timestamp
)
934 self
.dns_update_record(name
, txt2
)
935 timestamps
= self
.get_txt_timestamps(name
, txt1
, txt2
, txt3
)
936 self
.assertEqual(timestamps
, [777777,
940 def _test_update_aging_disabled_n_days_ago(self
, n_days
):
945 self
.set_aging(False)
946 current_time
= self
.dns_update_record(name
, txt1
).dwTimeStamp
948 # rewind timestamp using ldap
949 self
.ldap_modify_timestamps(name
, n_days
* -24)
950 n_days_ago
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
951 self
.assertGreater(current_time
, n_days_ago
)
953 # no change when updating this record
954 update_timestamp
= self
.dns_update_record(name
, txt1
).dwTimeStamp
955 self
.assert_timestamps_equal(update_timestamp
, n_days_ago
)
957 # add another record, which should have the current timestamp
958 timestamp2
= self
.dns_update_record(name
, txt2
).dwTimeStamp
959 self
.assert_soon_after(timestamp2
, current_time
)
961 # get the original record timestamp. NOW it matches current_time
962 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
963 self
.assert_timestamps_equal(timestamp1
, timestamp2
)
965 # let's repeat that, this time with txt2 existing
966 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=n_days_ago
)
968 timestamp1
= self
.dns_update_record(name
, txt1
).dwTimeStamp
969 self
.assert_timestamps_equal(timestamp1
, n_days_ago
)
971 # this update is not an add
972 timestamp2
= self
.dns_update_record(name
, txt2
).dwTimeStamp
973 self
.assert_soon_after(timestamp2
, current_time
)
975 # now timestamp1 is not changed
976 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
977 self
.assert_timestamps_equal(timestamp1
, n_days_ago
)
979 # delete record2, try again
980 self
.ldap_delete_record(name
, txt2
)
981 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=n_days_ago
)
983 timestamp1
= self
.dns_update_record(name
, txt1
).dwTimeStamp
984 self
.assert_timestamps_equal(timestamp1
, n_days_ago
)
986 # here we are re-adding the deleted record
987 timestamp2
= self
.dns_update_record(name
, txt2
).dwTimeStamp
988 self
.assert_soon_after(timestamp2
, current_time
)
990 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
992 # It gets weird HERE.
993 # note how the SIBLING of the deleted, re-added record differs
994 # from the sibling of freshly added record, depending on the
997 self
.assert_timestamps_equal(timestamp1
, n_days_ago
)
999 self
.assert_timestamps_equal(timestamp1
, timestamp2
)
1001 # re-timestamp record2, try again
1002 self
.ldap_update_record(name
, txt2
, dwTimeStamp
=n_days_ago
)
1003 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=n_days_ago
)
1005 timestamp1
= self
.dns_update_record(name
, txt1
).dwTimeStamp
1006 self
.assert_timestamps_equal(timestamp1
, n_days_ago
)
1009 timestamp2
= self
.dns_update_record(name
, txt2
).dwTimeStamp
1010 self
.assert_timestamps_equal(timestamp2
, n_days_ago
)
1012 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1013 self
.assert_timestamps_equal(timestamp1
, timestamp2
)
1015 # let's introduce another record
1017 self
.ldap_update_record(name
, txt2
, dwTimeStamp
=n_days_ago
)
1018 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=n_days_ago
)
1020 timestamp3
= self
.dns_update_record(name
, txt3
).dwTimeStamp
1021 self
.assert_soon_after(timestamp3
, current_time
)
1023 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1024 timestamp2
= self
.get_unique_txt_record(name
, txt2
).dwTimeStamp
1027 self
.assert_timestamps_equal(timestamp1
, n_days_ago
)
1029 self
.assert_timestamps_equal(timestamp1
, timestamp3
)
1031 self
.assert_timestamps_equal(timestamp2
, timestamp3
)
1033 self
.ldap_delete_record(name
, txt3
)
1034 timestamp3
= self
.dns_update_record(name
, txt3
).dwTimeStamp
1035 self
.assert_soon_after(timestamp3
, current_time
)
1036 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1037 timestamp2
= self
.get_unique_txt_record(name
, txt2
).dwTimeStamp
1040 self
.assert_timestamps_equal(timestamp1
, n_days_ago
)
1042 self
.assert_timestamps_equal(timestamp1
, timestamp3
)
1044 self
.assert_timestamps_equal(timestamp2
, timestamp3
)
1046 # and here we'll make txt3 static
1049 # and here we'll make txt1 static
1050 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=0)
1051 self
.ldap_update_record(name
, txt2
, dwTimeStamp
=n_days_ago
)
1052 self
.ldap_update_record(name
, txt3
, dwTimeStamp
=n_days_ago
)
1053 timestamp1
= self
.dns_update_record(name
, txt1
).dwTimeStamp
1054 timestamp2
= self
.get_unique_txt_record(name
, txt2
).dwTimeStamp
1055 timestamp3
= self
.get_unique_txt_record(name
, txt3
).dwTimeStamp
1056 timestamp4
= self
.dns_update_record(name
, txt4
).dwTimeStamp
1058 self
.assertEqual(timestamp1
, 0)
1059 self
.assert_timestamps_equal(timestamp2
, n_days_ago
)
1060 self
.assert_timestamps_equal(timestamp3
, n_days_ago
)
1061 self
.assert_soon_after(timestamp4
, current_time
)
1063 def test_update_aging_disabled_in_no_refresh_window(self
):
1064 self
._test
_update
_aging
_disabled
_n
_days
_ago
(4)
1066 def test_update_aging_disabled_on_no_refresh_boundary(self
):
1067 self
._test
_update
_aging
_disabled
_n
_days
_ago
(7)
1069 def test_update_aging_disabled_in_refresh_window(self
):
1070 self
._test
_update
_aging
_disabled
_n
_days
_ago
(9)
1072 def test_update_aging_disabled_beyond_refresh_window(self
):
1073 self
._test
_update
_aging
_disabled
_n
_days
_ago
(16)
1075 def test_update_aging_disabled_in_eighteenth_century(self
):
1076 self
._test
_update
_aging
_disabled
_n
_days
_ago
(100000)
1078 def test_update_aging_disabled_static(self
):
1083 self
.set_aging(False)
1085 current_time
= self
.dns_update_record(name
, txt1
).dwTimeStamp
1086 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=0)
1088 # no change when updating this record
1089 timestamp1
= self
.dns_update_record(name
, txt1
).dwTimeStamp
1090 self
.assertEqual(timestamp1
, 0)
1092 # add another record, which should have the current timestamp
1093 timestamp2
= self
.dns_update_record(name
, txt2
).dwTimeStamp
1094 self
.assert_soon_after(timestamp2
, current_time
)
1096 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1097 self
.assert_soon_after(timestamp1
, current_time
)
1099 # let's repeat that, this time with txt2 existing
1100 timestamp1
= self
.dns_update_record(name
, txt1
).dwTimeStamp
1101 self
.assert_soon_after(timestamp2
, current_time
)
1103 timestamp2
= self
.dns_update_record(name
, txt2
).dwTimeStamp
1104 self
.assert_soon_after(timestamp2
, current_time
)
1106 # delete record2, try again
1107 self
.ldap_delete_record(name
, txt2
)
1108 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=0)
1109 # no change when updating this record
1110 timestamp1
= self
.dns_update_record(name
, txt1
).dwTimeStamp
1111 self
.assertEqual(timestamp1
, 0)
1113 timestamp2
= self
.dns_update_record(name
, txt2
).dwTimeStamp
1114 self
.assertEqual(timestamp2
, 0)
1116 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1117 self
.assertEqual(timestamp1
, 0)
1118 # re-timestamp record2, try again
1119 self
.ldap_update_record(name
, txt2
, dwTimeStamp
=1)
1120 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=0)
1121 # no change when updating this record
1122 timestamp2
= self
.dns_update_record(name
, txt2
).dwTimeStamp
1123 self
.assert_timestamps_equal(timestamp2
, 1)
1125 def test_update_aging_disabled(self
):
1126 # With aging disabled, Windows updates the timestamps of all
1127 # records when one is updated.
1130 txt2
= ['test', 'txt2']
1131 txt3
= ['test', 'txt3']
1135 self
.set_aging(False)
1137 current_time
= self
.dns_update_record(name
, txt1
).dwTimeStamp
1139 # rewind timestamp using ldap
1140 self
.ldap_modify_timestamps(name
, minus_6
)
1141 after_mod
= self
.get_unique_txt_record(name
, txt1
)
1142 six_days_ago
= after_mod
.dwTimeStamp
1143 self
.assert_timestamps_equal(six_days_ago
, current_time
+ minus_6
)
1146 update_timestamp
= self
.dns_update_record(name
, txt1
).dwTimeStamp
1147 self
.assert_timestamps_equal(update_timestamp
, six_days_ago
)
1149 self
.check_query_txt(name
, txt1
, zone
=self
.zone
)
1152 timestamp2
= self
.dns_update_record(name
, txt2
).dwTimeStamp
1153 self
.assert_soon_after(timestamp2
, current_time
)
1155 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1156 # without aging, timestamp1 is changed!!
1157 self
.assert_timestamps_equal(timestamp1
, timestamp2
)
1159 # Set both records back to 8 days ago.
1160 self
.ldap_modify_timestamps(name
, minus_8
)
1162 eight_days_ago
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1163 self
.assert_timestamps_equal(eight_days_ago
, current_time
+ minus_8
)
1165 update2
= self
.dns_update_record(name
, txt2
)
1167 # Without aging on, an update should not change the timestamps.
1168 self
.assert_timestamps_equal(update2
.dwTimeStamp
, eight_days_ago
)
1169 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1170 self
.assert_timestamps_equal(timestamp1
, eight_days_ago
)
1172 # Add another txt record. The new record should have the now
1173 # timestamp, and drag the others up with it.
1174 timestamp3
= self
.dns_update_record(name
, txt3
).dwTimeStamp
1175 self
.assert_soon_after(timestamp3
, current_time
)
1176 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1177 timestamp2
= self
.get_unique_txt_record(name
, txt2
).dwTimeStamp
1178 self
.assert_timestamps_equal(timestamp1
, timestamp3
)
1179 self
.assert_timestamps_equal(timestamp2
, timestamp3
)
1181 hundred_days_ago
= current_time
- 100 * 24
1182 thousand_days_ago
= current_time
- 1000 * 24
1183 record
= self
.ldap_update_record(name
, txt1
,
1184 dwTimeStamp
=hundred_days_ago
)
1185 self
.assert_timestamps_equal(record
.dwTimeStamp
, hundred_days_ago
)
1186 record
= self
.ldap_update_record(name
, txt2
,
1187 dwTimeStamp
=thousand_days_ago
)
1188 self
.assert_timestamps_equal(record
.dwTimeStamp
, thousand_days_ago
)
1190 # update 3, will others change (because beyond RefreshInterval)? yes.
1191 timestamp3
= self
.dns_update_record(name
, txt3
).dwTimeStamp
1192 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1193 timestamp2
= self
.get_unique_txt_record(name
, txt2
).dwTimeStamp
1194 self
.assert_soon_after(timestamp3
, current_time
)
1195 self
.assert_timestamps_equal(timestamp1
, hundred_days_ago
)
1196 self
.assert_timestamps_equal(timestamp2
, thousand_days_ago
)
1198 fifteen_days_ago
= current_time
- 15 * 24
1199 self
.ldap_update_record(name
, txt3
, dwTimeStamp
=fifteen_days_ago
)
1201 timestamp2
= self
.dns_update_record(name
, txt2
).dwTimeStamp
1202 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1203 timestamp3
= self
.get_unique_txt_record(name
, txt3
).dwTimeStamp
1204 # DNS update has no effect because all records are old
1205 self
.assert_timestamps_equal(timestamp2
, thousand_days_ago
)
1206 self
.assert_timestamps_equal(timestamp1
, hundred_days_ago
)
1207 self
.assert_timestamps_equal(timestamp3
, fifteen_days_ago
)
1209 # Does update of old record affect timestamp of refreshable record? No.
1210 self
.ldap_update_record(name
, txt3
, dwTimeStamp
=eight_days_ago
)
1211 timestamp2
= self
.dns_update_record(name
, txt2
).dwTimeStamp
1212 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1213 timestamp3
= self
.get_unique_txt_record(name
, txt3
).dwTimeStamp
1214 # DNS update has no effect because all records are old
1215 self
.assert_timestamps_equal(timestamp2
, thousand_days_ago
)
1216 self
.assert_timestamps_equal(timestamp1
, hundred_days_ago
)
1217 self
.assert_timestamps_equal(timestamp3
, eight_days_ago
)
1219 # RPC zeros timestamp, after which updates won't change it.
1220 # BUT it refreshes all others!
1221 self
.rpc_update_record(name
, txt2
)
1223 timestamp2
= self
.dns_update_record(name
, txt3
).dwTimeStamp
1224 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1225 timestamp2
= self
.get_unique_txt_record(name
, txt2
).dwTimeStamp
1226 self
.assertEqual(timestamp2
, 0)
1227 self
.assert_soon_after(timestamp1
, current_time
)
1228 self
.assert_timestamps_equal(timestamp3
, eight_days_ago
)
1230 def test_update_aging_enabled(self
):
1233 txt2
= ['test', 'txt2']
1234 txt3
= ['test', 'txt3']
1237 self
.set_aging(True)
1239 current_time
= self
.dns_update_record(name
, txt2
).dwTimeStamp
1241 six_days_ago
= current_time
- 6 * 24
1242 eight_days_ago
= current_time
- 8 * 24
1243 fifteen_days_ago
= current_time
- 15 * 24
1244 hundred_days_ago
= current_time
- 100 * 24
1246 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=six_days_ago
)
1248 # with or without aging, a delta of -6 days does not affect
1249 # timestamps, because dwNoRefreshInterval is 7 days.
1250 timestamp1
= self
.dns_update_record(name
, txt1
).dwTimeStamp
1251 timestamp2
= self
.get_unique_txt_record(name
, txt2
).dwTimeStamp
1253 self
.assert_timestamps_equal(timestamp1
, six_days_ago
)
1254 self
.assert_soon_after(timestamp2
, current_time
)
1256 self
.ldap_update_record(name
, txt3
, dwTimeStamp
=eight_days_ago
)
1257 timestamp3
= self
.get_unique_txt_record(name
, txt3
).dwTimeStamp
1258 self
.assert_timestamps_equal(timestamp3
, eight_days_ago
)
1260 # update 1, what happens to 2 and 3? Nothing?
1261 timestamp1
= self
.dns_update_record(name
, txt1
).dwTimeStamp
1262 timestamp2
= self
.get_unique_txt_record(name
, txt2
).dwTimeStamp
1263 timestamp3
= self
.get_unique_txt_record(name
, txt3
).dwTimeStamp
1264 self
.assert_timestamps_equal(timestamp1
, six_days_ago
)
1265 self
.assert_soon_after(timestamp2
, current_time
)
1266 self
.assert_timestamps_equal(timestamp3
, eight_days_ago
)
1268 # now set 1 to 8 days, and we should see changes
1269 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=eight_days_ago
)
1271 # update 1, what happens to 2 and 3? Nothing?
1272 timestamp1
= self
.dns_update_record(name
, txt1
).dwTimeStamp
1273 timestamp2
= self
.get_unique_txt_record(name
, txt2
).dwTimeStamp
1274 timestamp3
= self
.get_unique_txt_record(name
, txt3
).dwTimeStamp
1275 self
.assert_soon_after(timestamp1
, current_time
)
1276 self
.assert_soon_after(timestamp2
, current_time
)
1277 self
.assert_timestamps_equal(timestamp3
, eight_days_ago
)
1279 # next few ones use these numbers
1280 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=fifteen_days_ago
)
1281 self
.ldap_update_record(name
, txt2
, dwTimeStamp
=six_days_ago
)
1282 self
.ldap_update_record(name
, txt3
, dwTimeStamp
=eight_days_ago
)
1284 # change even though 1 is outside the window
1285 timestamp1
= self
.dns_update_record(name
, txt1
).dwTimeStamp
1286 timestamp2
= self
.get_unique_txt_record(name
, txt2
).dwTimeStamp
1287 timestamp3
= self
.get_unique_txt_record(name
, txt3
).dwTimeStamp
1288 self
.assert_soon_after(timestamp1
, current_time
)
1289 self
.assert_timestamps_equal(timestamp2
, six_days_ago
)
1290 self
.assert_timestamps_equal(timestamp3
, eight_days_ago
)
1293 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=fifteen_days_ago
)
1295 # no change, because 2 is outside the window
1296 timestamp2
= self
.dns_update_record(name
, txt2
).dwTimeStamp
1297 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1298 timestamp3
= self
.get_unique_txt_record(name
, txt3
).dwTimeStamp
1299 self
.assert_timestamps_equal(timestamp1
, fifteen_days_ago
)
1300 self
.assert_timestamps_equal(timestamp2
, six_days_ago
)
1301 self
.assert_timestamps_equal(timestamp3
, eight_days_ago
)
1303 # 3 changes, others do not
1304 timestamp3
= self
.dns_update_record(name
, txt3
).dwTimeStamp
1305 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1306 timestamp2
= self
.get_unique_txt_record(name
, txt2
).dwTimeStamp
1307 self
.assert_timestamps_equal(timestamp1
, fifteen_days_ago
)
1308 self
.assert_timestamps_equal(timestamp2
, six_days_ago
)
1309 self
.assert_soon_after(timestamp3
, current_time
)
1311 # reset 3 to 100 days
1312 self
.ldap_update_record(name
, txt3
, dwTimeStamp
=hundred_days_ago
)
1314 # 3 changes, others do not
1315 timestamp3
= self
.dns_update_record(name
, txt3
).dwTimeStamp
1316 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1317 timestamp2
= self
.get_unique_txt_record(name
, txt2
).dwTimeStamp
1318 self
.assert_timestamps_equal(timestamp1
, fifteen_days_ago
)
1319 self
.assert_timestamps_equal(timestamp2
, six_days_ago
)
1320 self
.assert_soon_after(timestamp3
, current_time
)
1322 # reset 1 and 3 to 8 days. does update of 1 affect 3?
1323 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=eight_days_ago
)
1324 self
.ldap_update_record(name
, txt3
, dwTimeStamp
=eight_days_ago
)
1326 # 1 changes, others do not
1327 timestamp1
= self
.dns_update_record(name
, txt1
).dwTimeStamp
1328 timestamp2
= self
.get_unique_txt_record(name
, txt2
).dwTimeStamp
1329 timestamp3
= self
.get_unique_txt_record(name
, txt3
).dwTimeStamp
1330 self
.assert_soon_after(timestamp1
, current_time
)
1331 self
.assert_timestamps_equal(timestamp2
, six_days_ago
)
1332 self
.assert_timestamps_equal(timestamp3
, eight_days_ago
)
1334 # Try an RPC update, zeroing 1 --> what happens to 3?
1335 timestamp1
= self
.rpc_update_record(name
, txt1
).dwTimeStamp
1336 timestamp2
= self
.get_unique_txt_record(name
, txt2
).dwTimeStamp
1337 timestamp3
= self
.get_unique_txt_record(name
, txt3
).dwTimeStamp
1338 self
.assertEqual(timestamp1
, 0)
1339 self
.assert_timestamps_equal(timestamp2
, six_days_ago
)
1340 self
.assert_timestamps_equal(timestamp3
, eight_days_ago
)
1342 # with 2 and 3 at 8 days, does static record change things?
1343 self
.ldap_update_record(name
, txt2
, dwTimeStamp
=eight_days_ago
)
1344 # 2 changes, but to zero!
1345 timestamp2
= self
.dns_update_record(name
, txt2
).dwTimeStamp
1346 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1347 timestamp3
= self
.get_unique_txt_record(name
, txt3
).dwTimeStamp
1348 self
.assert_timestamps_equal(timestamp1
, 0)
1349 self
.assert_timestamps_equal(timestamp2
, 0)
1350 self
.assert_timestamps_equal(timestamp3
, eight_days_ago
)
1352 self
.ldap_update_record(name
, txt2
, dwTimeStamp
=six_days_ago
)
1353 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=3000000)
1354 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1355 self
.assert_timestamps_equal(timestamp1
, 3000000)
1357 # dns update remembers that node is static, even with no
1359 timestamp1
= self
.dns_update_record(name
, txt1
).dwTimeStamp
1360 self
.assertEqual(timestamp1
, 0)
1362 # Add another txt record. The new record should have the now
1363 # timestamp, and the others should remain unchanged.
1364 # BUT somehow record 1 is static!?
1365 timestamp4
= self
.dns_update_record(name
, txt4
).dwTimeStamp
1366 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1367 timestamp2
= self
.get_unique_txt_record(name
, txt2
).dwTimeStamp
1368 timestamp3
= self
.get_unique_txt_record(name
, txt3
).dwTimeStamp
1369 self
.assert_timestamps_equal(timestamp1
, 0)
1370 self
.assert_timestamps_equal(timestamp2
, six_days_ago
)
1371 self
.assert_timestamps_equal(timestamp3
, eight_days_ago
)
1372 self
.assert_timestamps_equal(timestamp4
, 0)
1374 def _test_update_aging_enabled_n_days_ago(self
, n_days
):
1378 delta
= n_days
* -24
1380 self
.set_aging(True)
1381 current_time
= self
.dns_update_record(name
, txt1
).dwTimeStamp
1383 # rewind timestamp using ldap
1384 self
.ldap_modify_timestamps(name
, delta
)
1385 n_days_ago
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1386 self
.assertGreater(current_time
, n_days_ago
)
1388 # update changes timestamp depending on time.
1389 timestamp1
= self
.dns_update_record(name
, txt1
).dwTimeStamp
1391 self
.assert_timestamps_equal(timestamp1
, n_days_ago
)
1393 self
.assert_soon_after(timestamp1
, current_time
)
1395 # add another record, which should have the current timestamp
1396 timestamp2
= self
.dns_update_record(name
, txt2
).dwTimeStamp
1397 self
.assert_soon_after(timestamp2
, current_time
)
1399 # first record should not have changed
1400 timestamp1_b
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1401 self
.assert_timestamps_equal(timestamp1
, timestamp1_b
)
1403 # let's repeat that, this time with txt2 existing
1404 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=n_days_ago
)
1406 timestamp1
= self
.dns_update_record(name
, txt1
).dwTimeStamp
1407 self
.assert_timestamps_equal(timestamp1
, timestamp1_b
)
1409 # this update is not an add. record 2 is already up-to-date
1410 timestamp2
= self
.dns_update_record(name
, txt2
).dwTimeStamp
1411 self
.assert_soon_after(timestamp2
, current_time
)
1413 # now timestamp1 is not changed
1414 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1415 self
.assert_timestamps_equal(timestamp1
, timestamp1_b
)
1417 # delete record2, try again
1418 self
.ldap_delete_record(name
, txt2
)
1419 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=n_days_ago
)
1421 timestamp1
= self
.dns_update_record(name
, txt1
).dwTimeStamp
1423 self
.assert_timestamps_equal(timestamp1
, n_days_ago
)
1425 self
.assert_soon_after(timestamp1
, current_time
)
1427 # here we are re-adding the deleted record
1428 timestamp2
= self
.dns_update_record(name
, txt2
).dwTimeStamp
1429 self
.assert_soon_after(timestamp2
, current_time
)
1431 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1433 # It gets weird HERE.
1434 # note how the SIBLING of the deleted, re-added record differs
1435 # from the sibling of freshly added record, depending on the
1438 self
.assert_timestamps_equal(timestamp1
, n_days_ago
)
1440 self
.assert_timestamps_equal(timestamp1
, timestamp2
)
1442 # re-timestamp record2, try again
1443 self
.ldap_update_record(name
, txt2
, dwTimeStamp
=n_days_ago
)
1444 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=n_days_ago
)
1446 # this should make no difference
1447 timestamp1_b
= self
.dns_update_record(name
, txt1
).dwTimeStamp
1448 self
.assert_timestamps_equal(timestamp1
, timestamp1_b
)
1451 timestamp2
= self
.dns_update_record(name
, txt2
).dwTimeStamp
1452 self
.assert_timestamps_equal(timestamp2
, timestamp1
)
1454 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1455 self
.assert_timestamps_equal(timestamp1
, timestamp2
)
1457 # let's introduce another record
1459 self
.ldap_update_record(name
, txt2
, dwTimeStamp
=n_days_ago
)
1460 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=n_days_ago
)
1462 timestamp3
= self
.dns_update_record(name
, txt3
).dwTimeStamp
1463 self
.assert_soon_after(timestamp3
, current_time
)
1465 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1466 timestamp2
= self
.get_unique_txt_record(name
, txt2
).dwTimeStamp
1468 self
.assert_timestamps_equal(timestamp1
, n_days_ago
)
1469 self
.assert_timestamps_equal(timestamp2
, n_days_ago
)
1471 self
.ldap_delete_record(name
, txt3
)
1472 timestamp3
= self
.dns_update_record(name
, txt3
).dwTimeStamp
1473 self
.assert_soon_after(timestamp3
, current_time
)
1474 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1475 timestamp2
= self
.get_unique_txt_record(name
, txt2
).dwTimeStamp
1477 self
.assert_timestamps_equal(timestamp1
, n_days_ago
)
1478 self
.assert_timestamps_equal(timestamp2
, n_days_ago
)
1482 # Because txt1 is static, txt4 is static
1483 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=0)
1484 self
.ldap_update_record(name
, txt2
, dwTimeStamp
=n_days_ago
)
1485 self
.ldap_update_record(name
, txt3
, dwTimeStamp
=n_days_ago
)
1486 timestamp1
= self
.dns_update_record(name
, txt1
).dwTimeStamp
1487 timestamp2
= self
.get_unique_txt_record(name
, txt2
).dwTimeStamp
1488 timestamp3
= self
.get_unique_txt_record(name
, txt3
).dwTimeStamp
1489 timestamp4
= self
.dns_update_record(name
, txt4
).dwTimeStamp
1491 self
.assert_timestamps_equal(timestamp1
, 0)
1492 self
.assert_timestamps_equal(timestamp2
, n_days_ago
)
1493 self
.assert_timestamps_equal(timestamp3
, n_days_ago
)
1494 self
.assert_timestamps_equal(timestamp4
, 0)
1496 longer_ago
= n_days_ago
// 2
1498 # remove all static records.
1499 self
.ldap_delete_record(name
, txt4
)
1500 self
.ldap_update_record(name
, txt1
, dwTimeStamp
=longer_ago
)
1501 self
.ldap_update_record(name
, txt2
, dwTimeStamp
=n_days_ago
)
1502 self
.ldap_update_record(name
, txt3
, dwTimeStamp
=n_days_ago
)
1503 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1504 self
.assert_timestamps_equal(timestamp1
, longer_ago
)
1506 timestamp4
= self
.dns_update_record(name
, txt4
).dwTimeStamp
1507 timestamp2
= self
.get_unique_txt_record(name
, txt2
).dwTimeStamp
1508 timestamp3
= self
.get_unique_txt_record(name
, txt3
).dwTimeStamp
1509 timestamp1
= self
.get_unique_txt_record(name
, txt1
).dwTimeStamp
1511 # Here, although there is no record from which to get the zero
1512 # timestamp, record 4 does it anyway.
1513 self
.assert_timestamps_equal(timestamp1
, longer_ago
)
1514 self
.assert_timestamps_equal(timestamp2
, n_days_ago
)
1515 self
.assert_timestamps_equal(timestamp3
, n_days_ago
)
1516 self
.assert_timestamps_equal(timestamp4
, 0)
1518 # and now record 1 wants to be static.
1519 self
.ldap_update_record(name
, txt4
, dwTimeStamp
=longer_ago
)
1520 timestamp4
= self
.get_unique_txt_record(name
, txt4
).dwTimeStamp
1521 self
.assert_timestamps_equal(timestamp4
, longer_ago
)
1522 timestamp1
= self
.dns_update_record(name
, txt1
).dwTimeStamp
1523 timestamp4
= self
.get_unique_txt_record(name
, txt4
).dwTimeStamp
1524 self
.assert_timestamps_equal(timestamp1
, 0)
1525 self
.assert_timestamps_equal(timestamp4
, longer_ago
)
1527 def test_update_aging_enabled_in_no_refresh_window(self
):
1528 self
._test
_update
_aging
_enabled
_n
_days
_ago
(4)
1530 def test_update_aging_enabled_on_no_refresh_boundary(self
):
1531 self
._test
_update
_aging
_enabled
_n
_days
_ago
(7)
1533 def test_update_aging_enabled_in_refresh_window(self
):
1534 self
._test
_update
_aging
_enabled
_n
_days
_ago
(9)
1536 def test_update_aging_enabled_beyond_refresh_window(self
):
1537 self
._test
_update
_aging
_enabled
_n
_days
_ago
(16)
1539 def test_update_aging_enabled_in_eighteenth_century(self
):
1540 self
._test
_update
_aging
_enabled
_n
_days
_ago
(100000)
1542 def test_update_static_stickiness(self
):
1549 self
.set_aging(False)
1550 self
.dns_update_record(name
, A
).dwTimeStamp
1551 self
.ldap_update_record(name
, B
, dwTimeStamp
=0)
1552 self
.dns_update_record(name
, B
)
1553 self
.dns_update_record(name
, C
)
1554 ctime
= self
.get_unique_txt_record(name
, C
).dwTimeStamp
1555 self
.assertEqual(ctime
, 0)
1556 btime
= self
.get_unique_txt_record(name
, B
).dwTimeStamp
1557 self
.assertEqual(btime
, 0)
1559 self
.ldap_replace_records(name
, [])
1561 self
.dns_update_record(name
, D
)
1562 dtime
= self
.get_unique_txt_record(name
, D
).dwTimeStamp
1563 self
.assertEqual(dtime
, 0)
1565 def _test_update_timestamp_weirdness(self
, n_days
, aging
=True):
1570 self
.set_aging(aging
)
1572 current_time
= self
.dns_update_record(name
, A
).dwTimeStamp
1574 # rewind timestamp using ldap
1575 self
.ldap_modify_timestamps(name
, n_days
* -24)
1576 n_days_ago
= self
.get_unique_txt_record(name
, A
).dwTimeStamp
1577 time_A
= self
.dns_update_record(name
, A
).dwTimeStamp
1578 # that dns_update should have reset the timestamp ONLY if
1579 # aging is on and the old timestamp is > noRefresh period (7
1581 if n_days
> 7 and aging
:
1582 self
.assert_soon_after(time_A
, current_time
)
1584 self
.assert_timestamps_equal(time_A
, n_days_ago
)
1586 # add another record, which should have the current timestamp
1587 time_B
= self
.dns_update_record(name
, B
).dwTimeStamp
1588 self
.assert_soon_after(time_B
, current_time
)
1590 time_A
= self
.get_unique_txt_record(name
, A
).dwTimeStamp
1591 if aging
and n_days
<= 7:
1592 self
.assert_timestamps_equal(time_A
, n_days_ago
)
1594 self
.assert_soon_after(time_A
, current_time
)
1596 # delete B, try again
1597 self
.ldap_delete_record(name
, B
)
1598 self
.ldap_update_record(name
, A
, dwTimeStamp
=n_days_ago
)
1600 time_A
= self
.dns_update_record(name
, A
).dwTimeStamp
1602 # here we are re-adding the deleted record
1603 time_B
= self
.dns_update_record(name
, B
).dwTimeStamp
1604 self
.assert_soon_after(time_B
, current_time
)
1606 time_A
= self
.get_unique_txt_record(name
, A
).dwTimeStamp
1607 return n_days_ago
, time_A
, time_B
1609 def test_update_timestamp_weirdness_no_refresh_no_aging(self
):
1610 n_days_ago
, time_A
, time_B
= \
1611 self
._test
_update
_timestamp
_weirdness
(5, False)
1612 # the timestamp of the SIBLING of the deleted, re-added record
1613 # differs from the sibling of freshly added record.
1614 self
.assert_timestamps_equal(time_A
, n_days_ago
)
1616 def test_update_timestamp_weirdness_no_refresh_aging(self
):
1617 n_days_ago
, time_A
, time_B
= \
1618 self
._test
_update
_timestamp
_weirdness
(5, True)
1619 # the timestamp of the SIBLING of the deleted, re-added record
1620 # differs from the sibling of freshly added record.
1621 self
.assert_timestamps_equal(time_A
, n_days_ago
)
1623 def test_update_timestamp_weirdness_refresh_no_aging(self
):
1624 n_days_ago
, time_A
, time_B
= \
1625 self
._test
_update
_timestamp
_weirdness
(9, False)
1626 self
.assert_timestamps_equal(time_A
, time_B
)
1628 def test_update_timestamp_weirdness_refresh_aging(self
):
1629 n_days_ago
, time_A
, time_B
= \
1630 self
._test
_update
_timestamp
_weirdness
(9, True)
1631 self
.assert_timestamps_equal(time_A
, time_B
)
1633 def test_aging_refresh(self
):
1634 name
, txt
= 'agingtest', ['test txt']
1637 self
.set_zone_int_params(NoRefreshInterval
=no_refresh
,
1638 RefreshInterval
=refresh
,
1640 before_mod
= self
.dns_update_record(name
, txt
)
1641 start_time
= before_mod
.dwTimeStamp
1643 # go back 86 hours, which is in the no-refresh time (but
1644 # wouldn't be if we had stuck to the default of 168).
1645 self
.ldap_modify_timestamps(name
, -170)
1646 rec
= self
.dns_update_record(name
, txt
)
1647 self
.assert_timestamps_equal(rec
.dwTimeStamp
,
1650 # back to -202 hours, into the refresh zone
1651 # the update should reset the timestamp to now.
1652 self
.ldap_modify_timestamps(name
, -32)
1653 rec
= self
.dns_update_record(name
, txt
)
1654 self
.assert_soon_after(rec
.dwTimeStamp
, start_time
)
1656 # back to -362 hours, beyond the end of the refresh period.
1657 # Actually nothing changes at this time -- we can still
1658 # refresh, but the record is liable for scavenging.
1659 self
.ldap_modify_timestamps(name
, -160)
1660 rec
= self
.dns_update_record(name
, txt
)
1661 self
.assert_soon_after(rec
.dwTimeStamp
, start_time
)
1663 def test_add_no_timestamp(self
):
1664 # check zero timestamp is implicit
1665 self
.set_aging(True)
1666 rec
= self
.ldap_update_record('ldap', 'test')
1667 self
.assertEqual(rec
.dwTimeStamp
, 0)
1668 rec
= self
.rpc_update_record('rpc', 'test')
1669 self
.assertEqual(rec
.dwTimeStamp
, 0)
1671 def test_add_zero_timestamp(self
):
1672 rec
= self
.ldap_update_record('ldap', 'test', dwTimeStamp
=0)
1673 self
.assertEqual(rec
.dwTimeStamp
, 0)
1674 rec
= self
.rpc_update_record('rpc', 'test', dwTimeStamp
=0)
1675 self
.assertEqual(rec
.dwTimeStamp
, 0)
1677 def test_add_update_timestamp(self
):
1678 # LDAP can change timestamp, RPC can't
1679 rec
= self
.ldap_update_record('ldap', 'test', dwTimeStamp
=123456)
1680 self
.assertEqual(rec
.dwTimeStamp
, 123456)
1681 rec
= self
.rpc_update_record('rpc', 'test', dwTimeStamp
=123456)
1682 self
.assertEqual(rec
.dwTimeStamp
, 0)
1683 # second time is a different code path (add vs update)
1684 rec
= self
.rpc_update_record('rpc', 'test', dwTimeStamp
=123456)
1685 self
.assertEqual(rec
.dwTimeStamp
, 0)
1686 # RPC update the one with timestamp, zeroing it.
1687 rec
= self
.rpc_update_record('ldap', 'test', dwTimeStamp
=123456)
1688 self
.assertEqual(rec
.dwTimeStamp
, 0)
1690 def test_add_update_ttl(self
):
1691 # RPC *can* set dwTtlSeconds.
1692 rec
= self
.ldap_update_record('ldap', 'test',
1694 self
.assertEqual(rec
.dwTtlSeconds
, 1234)
1695 rec
= self
.rpc_update_record('rpc', 'test', dwTtlSeconds
=1234)
1696 self
.assertEqual(rec
.dwTtlSeconds
, 1234)
1697 # does update work like add?
1698 rec
= self
.rpc_update_record('rpc', 'test', dwTtlSeconds
=4321)
1699 self
.assertEqual(rec
.dwTtlSeconds
, 4321)
1700 rec
= self
.rpc_update_record('ldap', 'test', dwTtlSeconds
=5678)
1701 self
.assertEqual(rec
.dwTtlSeconds
, 5678)
1703 def test_add_update_ttl_serial(self
):
1704 # when setting dwTtlSeconds, what happens to serial number?
1705 rec
= self
.ldap_update_record('ldap', 'test',
1708 self
.assertEqual(rec
.dwTtlSeconds
, 1234)
1709 self
.assertEqual(rec
.dwSerial
, 123)
1710 rec
= self
.rpc_update_record('rpc', 'test', dwTtlSeconds
=1234)
1711 self
.assertEqual(rec
.dwTtlSeconds
, 1234)
1712 serial
= rec
.dwSerial
1713 self
.assertLess(serial
, 4)
1714 rec
= self
.rpc_update_record('rpc', 'test', dwTtlSeconds
=4321)
1715 self
.assertEqual(rec
.dwTtlSeconds
, 4321)
1716 self
.assertEqual(rec
.dwSerial
, serial
+ 1)
1717 rec
= self
.rpc_update_record('ldap', 'test', dwTtlSeconds
=5678)
1718 self
.assertEqual(rec
.dwTtlSeconds
, 5678)
1719 self
.assertEqual(rec
.dwSerial
, 124)
1721 def test_add_update_dwFlags(self
):
1722 # dwFlags splits into rank and flags.
1723 # according to [MS-DNSP] 2.3.2.2, flags MUST be zero
1724 rec
= self
.ldap_update_record('ldap', 'test', flags
=22222, rank
=222)
1725 self
.assertEqual(rec
.flags
, 22222)
1726 self
.assertEqual(rec
.rank
, 222)
1728 rec
= self
.rpc_update_record('ldap', 'test', dwFlags
=3333333)
1729 # rank != 3333333 & 0xff == 213
1730 self
.assertEqual(rec
.rank
, 240) # RPC fixes rank
1731 self
.assertEqual(rec
.flags
, 0)
1733 self
.assertRaises(OverflowError,
1734 self
.ldap_update_record
,
1735 'ldap', 'test', flags
=777777777, rank
=777)
1737 # reset to no default (rank overflows)
1738 rec
= self
.ldap_update_record('ldap', 'test', flags
=7777, rank
=777)
1739 self
.assertEqual(rec
.flags
, 7777)
1740 self
.assertEqual(rec
.rank
, 9)
1742 # DNS update zeros flags, sets rank to 240 (RANK_ZONE)
1743 rec
= self
.dns_update_record('ldap', 'test', ttl
=999)
1744 self
.assertEqual(rec
.flags
, 0)
1745 self
.assertEqual(rec
.rank
, 240)
1747 rec
= self
.rpc_update_record('ldap', 'test', dwFlags
=321)
1748 self
.assertEqual(rec
.flags
, 0)
1749 self
.assertEqual(rec
.rank
, 240)
1751 # RPC adding a new record: fixed rank, zero flags
1752 rec
= self
.rpc_update_record('ldap', 'test 2', dwFlags
=12345)
1753 self
.assertEqual(rec
.rank
, 240)
1754 self
.assertEqual(rec
.flags
, 0)
1756 def test_add_update_dwReserved(self
):
1757 # RPC does not change dwReserved.
1758 rec
= self
.ldap_update_record('ldap', 'test', dwReserved
=54321)
1759 self
.assertEqual(rec
.dwReserved
, 54321)
1760 rec
= self
.rpc_update_record('rpc', 'test', dwReserved
=54321)
1761 self
.assertEqual(rec
.dwReserved
, 0)
1762 rec
= self
.rpc_update_record('rpc', 'test', dwReserved
=54321)
1763 self
.assertEqual(rec
.dwReserved
, 0)
1764 rec
= self
.rpc_update_record('ldap', 'test', dwReserved
=12345)
1765 self
.assertEqual(rec
.dwReserved
, 54321)
1767 def test_add_update_dwSerial(self
):
1768 # On Windows the RPC record ends up with serial 2, on Samba
1769 # serial 3. Rather than knownfail this, we accept anything
1770 # below 4 (for now).
1771 rec
= self
.ldap_update_record('ldap', 'test', dwSerial
=123)
1772 self
.assertEqual(rec
.dwSerial
, 123)
1773 rec
= self
.rpc_update_record('rpc', 'test', dwSerial
=123)
1774 self
.assertLess(rec
.dwSerial
, 4)
1775 rec
= self
.rpc_update_record('rpc', 'test', dwSerial
=123)
1776 self
.assertLess(rec
.dwSerial
, 4)
1777 rec
= self
.dns_update_record('rpc', 'test')
1778 self
.assertLess(rec
.dwSerial
, 4)
1779 rec
= self
.dns_update_record('dns-0', 'test')
1780 self
.assertLess(rec
.dwSerial
, 5)
1782 rec
= self
.dns_update_record('ldap', 'test')
1783 self
.assertEqual(rec
.dwSerial
, 123)
1784 rec
= self
.rpc_update_record('ldap', 'test', dwSerial
=123)
1785 self
.assertEqual(rec
.dwSerial
, 123)
1786 rec
= self
.ldap_update_record('ldap', 'test', dwSerial
=12)
1787 self
.assertEqual(rec
.dwSerial
, 12)
1788 # when we dns-updated ldap/test, we alerted Windows to 123 as
1789 # a high water mark for the zone. (even though we have since
1790 # dropped the serial to 12, 123 is the base serial for new
1792 rec
= self
.dns_update_record('dns', 'test')
1793 self
.assertEqual(rec
.dwSerial
, 124)
1794 rec
= self
.dns_update_record('dns2', 'test')
1795 self
.assertEqual(rec
.dwSerial
, 125)
1796 rec
= self
.rpc_update_record('rpc2', 'test')
1797 self
.assertEqual(rec
.dwSerial
, 126)
1798 rec
= self
.dns_update_record('dns', 'test 2')
1799 self
.assertEqual(rec
.dwSerial
, 127)
1801 def test_add_update_dwSerial_2(self
):
1802 # On Samba the RPC update resets the serial to a low number,
1803 # while Windows leaves it high.
1804 rec
= self
.ldap_update_record('ldap', 'test', dwSerial
=123)
1805 self
.assertEqual(rec
.dwSerial
, 123)
1806 rec
= self
.rpc_update_record('ldap', 'test', dwSerial
=321)
1807 self
.assertEqual(rec
.dwSerial
, 123)
1808 rec
= self
.dns_update_record('ldap', 'test')
1809 self
.assertEqual(rec
.dwSerial
, 123)
1811 def test_rpc_update_disparate_types(self
):
1812 """Can we use update to replace a TXT with an AAAA?"""
1814 old
= TXTRecord("x")
1815 new
= ARecord("127.0.0.111")
1816 self
.rpc_replace(name
, None, old
)
1817 recs
= self
.ldap_get_records(name
)
1818 self
.assertEqual(len(recs
), 1)
1819 self
.assertEqual(recs
[0].wType
, old
.wType
)
1821 self
.rpc_replace(name
, old
, new
)
1822 recs
= self
.ldap_get_records(name
)
1823 self
.assertEqual(len(recs
), 1)
1824 self
.assertEqual(recs
[0].wType
, new
.wType
)
1826 def test_add_update_many(self
):
1827 # Samba fails often in this set, but we want to see how it
1828 # goes further down, so we print the problems and defer the
1835 nonlocal failures
, total
1839 except self
.failureException
as e
:
1840 from traceback
import format_stack
1841 print(f
"{format_stack()[-2]} {e}\n")
1845 defer_assertEqual
= _defer_wrap(self
.assertEqual
)
1846 defer_assert_timestamp_in_ballpark
= \
1847 _defer_wrap(self
.assert_timestamp_in_ballpark
)
1849 self
.set_aging(False)
1850 rec
= self
.ldap_update_record('ldap', 'test',
1859 self
.assertEqual(rec
.version
, 5) # disobeys request
1860 self
.assertEqual(rec
.rank
, 22)
1861 self
.assertEqual(rec
.flags
, 33)
1862 self
.assertEqual(rec
.dwSerial
, 44)
1863 self
.assertEqual(rec
.dwTtlSeconds
, 55)
1864 self
.assertEqual(rec
.dwReserved
, 66)
1865 self
.assertEqual(rec
.dwTimeStamp
, 77)
1867 rec
= self
.dns_update_record('ldap', 'test', ttl
=999)
1868 self
.assertEqual(rec
.version
, 5)
1869 self
.assertEqual(rec
.rank
, 240) # rank gets fixed by DNS update
1870 defer_assertEqual(rec
.flags
, 0) # flags gets fixed
1871 defer_assertEqual(rec
.dwSerial
, 45) # serial increments
1872 self
.assertEqual(rec
.dwTtlSeconds
, 999) # TTL set
1873 defer_assertEqual(rec
.dwReserved
, 0) # reserved fixed
1874 defer_assert_timestamp_in_ballpark(rec
) # changed on Windows ?!
1876 self
.set_aging(True)
1877 rec
= self
.dns_update_record('ldap', 'test', ttl
=1111)
1878 self
.assertEqual(rec
.version
, 5)
1879 self
.assertEqual(rec
.rank
, 240)
1880 defer_assertEqual(rec
.flags
, 0)
1881 defer_assertEqual(rec
.dwSerial
, 46)
1882 self
.assertEqual(rec
.dwTtlSeconds
, 1111) # TTL set
1883 defer_assertEqual(rec
.dwReserved
, 0)
1884 self
.assert_timestamp_in_ballpark(rec
)
1887 rec
= self
.rpc_update_record('ldap', 'test',
1895 self
.assertEqual(rec
.version
, 5) # no change
1896 self
.assertEqual(rec
.rank
, 240) # no change
1897 defer_assertEqual(rec
.flags
, 0) # no change
1898 defer_assertEqual(rec
.dwSerial
, 47) # Serial increments
1899 self
.assertEqual(rec
.dwTtlSeconds
, 555) # TTL set
1900 defer_assertEqual(rec
.dwReserved
, 0) # no change
1901 self
.assertEqual(rec
.dwTimeStamp
, 0) # timestamp zeroed
1903 # RPC update, using default values
1904 rec
= self
.rpc_update_record('ldap', 'test')
1905 self
.assertEqual(rec
.version
, 5)
1906 self
.assertEqual(rec
.rank
, 240)
1907 defer_assertEqual(rec
.flags
, 0)
1908 defer_assertEqual(rec
.dwSerial
, 48) # serial increments
1909 self
.assertEqual(rec
.dwTtlSeconds
, 900) # TTL changed
1910 defer_assertEqual(rec
.dwReserved
, 0)
1911 self
.assertEqual(rec
.dwTimeStamp
, 0)
1913 self
.set_aging(False)
1914 rec
= self
.dns_update_record('ldap', 'test', ttl
=888)
1915 self
.assertEqual(rec
.version
, 5)
1916 self
.assertEqual(rec
.rank
, 240)
1917 defer_assertEqual(rec
.flags
, 0)
1918 defer_assertEqual(rec
.dwSerial
, 49) # serial increments
1919 self
.assertEqual(rec
.dwTtlSeconds
, 888) # TTL set
1920 defer_assertEqual(rec
.dwReserved
, 0)
1921 self
.assertEqual(rec
.dwTimeStamp
, 0) # timestamp stays zero
1924 self
.fail(f
"failed {failures}/{total} deferred assertions")
1926 def test_static_record_dynamic_update(self
):
1927 """Add a static record, then a dynamic record.
1928 The dynamic record should have a timestamp set."""
1930 txt
= ['static txt']
1931 txt2
= ['dynamic txt']
1932 self
.set_aging(True)
1933 rec
= self
.ldap_update_record(name
, txt
, dwTimeStamp
=0)
1934 rec2
= self
.dns_update_record(name
, txt2
)
1935 self
.assert_timestamp_in_ballpark(rec2
)
1936 ts2
= rec2
.dwTimeStamp
1937 # update the first record. It should stay static (timestamp 0)
1938 rec
= self
.dns_update_record(name
, txt
)
1939 self
.assertEqual(rec
.dwTimeStamp
, 0)
1940 # and rec2 should be unchanged.
1941 self
.assertEqual(rec2
.dwTimeStamp
, ts2
)
1943 def test_dynamic_record_static_update(self
):
1945 txt1
= ['dns update before']
1946 txt2
= ['ldap update']
1947 txt3
= ['dns update after']
1948 self
.set_aging(True)
1950 self
.dns_update_record(name
, txt1
)
1951 self
.ldap_update_record(name
, txt2
)
1952 self
.dns_update_record(name
, txt3
)
1954 recs
= self
.get_rpc_records(name
)
1956 d
= [x
.str for x
in r
.data
.str]
1958 self
.assertNotEqual(r
.dwTimeStamp
, 0)
1960 self
.assertEqual(r
.dwTimeStamp
, 0)
1962 self
.assertNotEqual(r
.dwTimeStamp
, 0)
1964 def test_tombstone_in_hours_and_nttime(self
):
1965 # Until now Samba has measured tombstone timestamps in hours,
1966 # not ten-millionths of a second. After now, we want Samba to
1969 nh
, oh
, nn
, on
, on0
, onf
, nn0
, nnf
, _1601
= 'abcdefgij'
1970 now_hours
= dsdb_dns
.unix_to_dns_timestamp(int(time
.time()))
1971 old_hours
= now_hours
- 24 * 90
1972 now_nttime
= dsdb_dns
.dns_timestamp_to_nt_time(now_hours
)
1973 old_nttime
= dsdb_dns
.dns_timestamp_to_nt_time(old_hours
)
1974 # calculations on hours might be based on the lower 32 bits,
1975 # so we test with these forced to extremes (the maximum change
1976 # is 429 seconds in NTTIME).
1977 old_nttime0
= old_nttime
& 0xffffffff00000000
1978 old_nttimef
= old_nttime |
0xffffffff
1979 now_nttime0
= now_nttime
& 0xffffffff00000000
1980 now_nttimef
= now_nttime |
0xffffffff
1981 self
.dns_tombstone(nh
, epoch_nttime
=now_hours
)
1982 self
.dns_tombstone(oh
, epoch_nttime
=old_hours
)
1983 self
.dns_tombstone(nn
, epoch_nttime
=now_nttime
)
1984 self
.dns_tombstone(on
, epoch_nttime
=old_nttime
)
1985 self
.dns_tombstone(nn0
, epoch_nttime
=now_nttime0
)
1986 self
.dns_tombstone(nnf
, epoch_nttime
=now_nttimef
)
1987 self
.dns_tombstone(on0
, epoch_nttime
=old_nttime0
)
1988 self
.dns_tombstone(onf
, epoch_nttime
=old_nttimef
)
1989 # this is our (arbitrary) threshold that will make us think in
1990 # NTTIME, not hours.
1991 self
.dns_tombstone(_1601
, epoch_nttime
=(10 * 1000 * 1000 + 1))
1994 file_samdb
= get_file_samdb()
1995 except ldb
.LdbError
as e
:
1996 raise AssertionError(
1997 f
"failing because '{e}': this is Windows?") from None
1998 dsdb
._dns
_delete
_tombstones
(file_samdb
)
2000 # nh and nn should not be deleted
2001 for name
in nh
, nn
, nn0
, nnf
:
2002 recs
= self
.ldap_get_records(name
)
2003 self
.assertEqual(len(recs
), 1)
2004 self
.assert_tombstoned(name
, timestamp
=False)
2006 # oh and on should be GONE
2007 for name
in oh
, on
, on0
, onf
, _1601
:
2008 recs
= self
.ldap_get_records(name
)
2009 self
.assertEqual(len(recs
), 0)
2011 def test_dns_query_for_tombstoned_results(self
):
2012 # This one fails on Windows, because the dns cache holds B
2013 # after it has been tombstoned behind its back.
2016 self
.dns_tombstone(A
)
2017 self
.assert_tombstoned(A
)
2018 r
= self
.dns_query(A
, qtype
=dns
.DNS_QTYPE_TXT
)
2019 self
.assertEqual(r
.ancount
, 0)
2021 self
.dns_update_record(B
, B
)
2022 self
.dns_tombstone(B
)
2023 self
.assert_tombstoned(B
)
2024 r
= self
.dns_query(B
, qtype
=dns
.DNS_QTYPE_TXT
)
2025 self
.assertEqual(r
.ancount
, 0)
2027 def test_basic_scavenging(self
):
2028 # NOTE: This one fails on Windows, because the RPC call to
2029 # prompt scavenging is not immediate. On Samba, in the
2030 # testenv, we don't have the RPC call but we can connect to
2031 # the database directly.
2033 # just to be sure we have the right limits.
2034 self
.set_zone_int_params(NoRefreshInterval
=168,
2035 RefreshInterval
=168,
2038 ts1
, ts2
, ts3
, ts4
, ts5
, ts6
= ('1', '2', '3', '4', '5', '6')
2039 self
.dns_update_record(ts1
, ts1
)
2040 self
.dns_update_record(ts2
, ts2
)
2041 # ts2 is tombstoned and timestamped in 1981
2042 self
.dns_tombstone(ts2
)
2043 # ts3 is tombstoned and timestamped in the future
2044 self
.dns_tombstone(ts3
, epoch_hours
=(DNS_TIMESTAMP_2101
- 1))
2045 # ts4 is tombstoned and timestamped in the past
2046 self
.dns_tombstone(ts4
, epoch_hours
=1111111)
2047 # ts5 is tombstoned in the past and timestamped in the future
2048 self
.dns_tombstone(ts5
, epoch_hours
=5555555, epoch_nttime
=int(1e10
))
2050 # ts2 and ts3 should now be tombstoned.
2051 self
.assert_tombstoned(ts2
)
2052 self
.assert_tombstoned(ts3
)
2054 # let's un-tombstone ts2
2055 # ending up with dnsTombstoned: FALSE in Samba
2056 # and no dNSTombstoned in Windows.
2057 self
.dns_update_record(ts2
, "ts2 untombstoned")
2058 ts2_node
= self
.get_one_node(ts2
)
2059 ts2_tombstone
= ts2_node
.get("dNSTombstoned")
2060 if ts2_tombstone
is not None:
2061 self
.assertEqual(ts2_tombstone
[0], b
"FALSE")
2063 self
.assert_tombstoned(ts2
, tombstoned
=False)
2065 r
= self
.dns_update_record(ts6
, ts6
)
2067 # put some records into the death zone.
2068 self
.ldap_modify_timestamps(ts1
, -15 * 24)
2069 self
.ldap_modify_timestamps(ts2
, -14 * 24 - 2)
2070 self
.ldap_modify_timestamps(ts6
, -14 * 24 + 2)
2072 # ts1 will be saved by this record
2073 self
.dns_update_record(ts1
, "another record")
2076 # Tell the server to clean-up records.
2077 # This is how it *should* work on Windows:
2078 self
.rpc_conn
.DnssrvOperation2(
2079 dnsserver
.DNS_CLIENT_VERSION_LONGHORN
,
2085 dnsserver
.DNSSRV_TYPEID_NULL
,
2087 # Samba won't get here (NOT_IMPLEMENTED error)
2088 # wait for Windows to do its cleanup.
2090 except WERRORError
as e
:
2091 if e
.args
[0] == werror
.WERR_CALL_NOT_IMPLEMENTED
:
2092 # This is the Samba way, talking to the file directly,
2093 # as if we were the server process. The direct
2094 # connection is needed because the tombstoning search
2095 # involves a magic system only filter.
2096 file_samdb
= get_file_samdb()
2097 dsdb
._scavenge
_dns
_records
(file_samdb
)
2098 dsdb
._dns
_delete
_tombstones
(file_samdb
)
2102 # Now what we should have:
2103 # ts1: alive: the old record is deleted, the new one not.
2106 # ts4: deleted. gone.
2107 # ts5: deleted. timestamp affects tombstoning, but not deletion.
2110 # We order our assertions to make the windows test
2111 # fail as late as possible (on ts4, ts5, ts2).
2112 r
= self
.get_unique_txt_record(ts1
, ["another record"])
2113 self
.assertIsNotNone(r
)
2114 r
= self
.get_unique_txt_record(ts6
, [ts6
])
2115 self
.assertIsNotNone(r
)
2117 self
.assert_tombstoned(ts3
)
2119 n
= self
.get_one_node(ts4
)
2120 self
.assertIsNone(n
)
2121 n
= self
.get_one_node(ts5
)
2122 self
.assertIsNone(n
)
2124 self
.assert_tombstoned(ts2
)
2126 def test_samba_scavenging(self
):
2127 # We expect this one to fail on Windows, because scavenging
2128 # and tombstoning cannot be performed on demand.
2131 file_samdb
= get_file_samdb()
2132 except ldb
.LdbError
as e
:
2133 raise AssertionError(
2134 f
"failing because '{e}': this is Windows?") from None
2136 # let's try different limits.
2137 self
.set_zone_int_params(NoRefreshInterval
=30,
2141 now
= dsdb_dns
.unix_to_dns_timestamp(int(time
.time()))
2144 # A has current time
2145 # B has safe, non-updateable time
2148 atime
= self
.dns_update_record(A
, A
).dwTimeStamp
2149 btime
= self
.ldap_update_record(B
, B
, dwTimeStamp
=now
-20).dwTimeStamp
2150 ctime
= self
.ldap_update_record(C
, C
, dwTimeStamp
=now
-40).dwTimeStamp
2151 dtime
= self
.ldap_update_record(D
, D
, dwTimeStamp
=now
-60).dwTimeStamp
2152 self
.assert_soon_after(atime
, now
)
2153 self
.assert_timestamps_equal(btime
, now
-20)
2154 self
.assert_timestamps_equal(ctime
, now
-40)
2155 self
.assert_timestamps_equal(dtime
, now
-60)
2157 dsdb
._scavenge
_dns
_records
(file_samdb
)
2159 # D should be gone (tombstoned)
2160 r
= self
.get_unique_txt_record(D
, D
)
2161 self
.assertIsNone(r
)
2162 r
= self
.dns_query(D
, qtype
=dns
.DNS_QTYPE_TXT
)
2163 self
.assertEqual(r
.ancount
, 0)
2164 recs
= self
.ldap_get_records(D
)
2165 self
.assertEqual(len(recs
), 1)
2166 self
.assert_tombstoned(D
)
2169 atime
= self
.get_unique_txt_record(A
, A
).dwTimeStamp
2170 btime
= self
.get_unique_txt_record(B
, B
).dwTimeStamp
2171 ctime
= self
.get_unique_txt_record(C
, C
).dwTimeStamp
2172 self
.assert_soon_after(atime
, now
)
2173 self
.assert_timestamps_equal(btime
, now
-20)
2174 self
.assert_timestamps_equal(ctime
, now
-40)
2176 btime
= self
.dns_update_record(B
, B
).dwTimeStamp
2177 ctime
= self
.dns_update_record(C
, C
).dwTimeStamp
2178 self
.assert_timestamps_equal(btime
, now
-40)
2179 self
.assert_soon_after(ctime
, now
)
2181 # after this, D *should* still be a tombstone, because its
2182 # tombstone timestamp is not very old.
2183 dsdb
._dns
_delete
_tombstones
(file_samdb
)
2184 recs
= self
.ldap_get_records(D
)
2185 self
.assertEqual(len(recs
), 1)
2186 self
.assert_tombstoned(D
)
2188 # Let's delete C using rpc, and ensure it survives dns_delete_tombstones
2189 self
.rpc_delete_txt(C
, C
)
2190 recs
= self
.ldap_get_records(C
)
2191 self
.assertEqual(len(recs
), 1)
2192 self
.assert_tombstoned(C
)
2193 dsdb
._dns
_delete
_tombstones
(file_samdb
)
2194 recs
= self
.ldap_get_records(C
)
2195 self
.assertEqual(len(recs
), 1)
2196 self
.assert_tombstoned(C
)
2198 # now let's wind A and B back to either side of the two week
2199 # threshold. A should survive, B should not.
2200 self
.dns_tombstone(A
, (now
- 166))
2201 self
.dns_tombstone(B
, (now
- 170))
2202 dsdb
._dns
_delete
_tombstones
(file_samdb
)
2204 recs
= self
.ldap_get_records(A
)
2205 self
.assertEqual(len(recs
), 1)
2206 self
.assert_tombstoned(A
)
2208 recs
= self
.ldap_get_records(B
)
2209 self
.assertEqual(len(recs
), 0)
2211 def _test_A_and_AAAA_records(self
, A
, B
, a_days
, b_days
, aging
):
2212 self
.set_aging(aging
)
2215 now
= dsdb_dns
.unix_to_dns_timestamp(int(time
.time()))
2216 a_initial
= now
- 24 * a_days
2217 b_initial
= now
- 24 * b_days
2219 self
.dns_update_non_text(name
, A
)
2220 self
.ldap_modify_timestamps(name
, a_days
* -24)
2222 rec_a
= self
.get_unique_ip_record(name
, A
)
2223 rec_b
= self
.add_ip_record(name
, B
, dwTimeStamp
=b_initial
)
2225 self
.assert_timestamps_equal(rec_a
, a_initial
)
2226 self
.assert_timestamps_equal(rec_b
, b_initial
)
2228 # touch the A record.
2229 self
.dns_update_non_text(name
, A
)
2231 # check the A timestamp, depending on norefresh
2232 rec_a
= self
.get_unique_ip_record(name
, A
)
2233 if aging
and a_days
> 7:
2235 self
.assert_soon_after(rec_a
, now
)
2237 # when we have NO aging and are in the refresh window, the
2238 # timestamp now reads as a_initial, but will become now
2239 # after we manipulate B for a bit.
2241 self
.assert_timestamps_equal(rec_a
, a_initial
)
2244 self
.assert_timestamps_equal(rec_a
, a_initial
)
2246 # B timestamp should be unchanged?
2247 rec_b
= self
.get_unique_ip_record(name
, B
)
2248 self
.assert_timestamps_equal(rec_b
, b_initial
)
2250 # touch the B record.
2251 self
.dns_update_non_text(name
, B
)
2253 # check the B timestamp
2254 rec_b
= self
.get_unique_ip_record(name
, B
)
2256 self
.windows_variation(
2257 self
.assert_soon_after
, rec_b
, now
,
2258 msg
="windows updates non-aging, samba does not")
2260 self
.assert_soon_after(rec_b
, now
)
2263 rec_b
= self
.add_ip_record(name
, B
, dwTimeStamp
=b_initial
)
2265 # NOW rec A might have changed! with no aging, and out of refresh.
2266 rec_a
= self
.get_unique_ip_record(name
, A
)
2267 self
.assert_timestamps_equal(rec_a
, time_a
)
2269 self
.dns_update_non_text(name
, A
)
2271 rec_a
= self
.get_unique_ip_record(name
, B
)
2272 self
.assert_timestamps_equal(rec_b
, b_initial
)
2275 _
, wtype
= guess_wtype(A
)
2276 self
.ldap_delete_record(name
, A
, wtype
=wtype
)
2279 self
.dns_update_non_text(name
, A
)
2281 rec_a
= self
.get_unique_ip_record(name
, A
)
2282 self
.assert_soon_after(rec_a
, now
)
2284 rec_b
= self
.get_unique_ip_record(name
, B
)
2285 self
.assert_timestamps_equal(rec_b
, b_initial
)
2287 def test_A_5_days_AAAA_5_days_aging(self
):
2288 self
._test
_A
_and
_AAAA
_records
(IPv4_ADDR
, IPv6_ADDR
, 5, 5, aging
=True)
2290 def test_A_5_days_AAAA_5_days_no_aging(self
):
2291 self
._test
_A
_and
_AAAA
_records
(IPv4_ADDR
, IPv6_ADDR
, 5, 5, aging
=False)
2293 def test_A_5_days_AAAA_10_days_aging(self
):
2294 self
._test
_A
_and
_AAAA
_records
(IPv4_ADDR
, IPv6_ADDR
, 5, 10, aging
=True)
2296 def test_A_5_days_AAAA_10_days_no_aging(self
):
2297 self
._test
_A
_and
_AAAA
_records
(IPv4_ADDR
, IPv6_ADDR
, 5, 10, aging
=False)
2299 def test_A_10_days_AAAA_5_days_aging(self
):
2300 self
._test
_A
_and
_AAAA
_records
(IPv4_ADDR
, IPv6_ADDR
, 10, 5, aging
=True)
2302 def test_A_10_days_AAAA_5_days_no_aging(self
):
2303 self
._test
_A
_and
_AAAA
_records
(IPv4_ADDR
, IPv6_ADDR
, 10, 5, aging
=False)
2305 def test_A_10_days_AAAA_9_days_aging(self
):
2306 self
._test
_A
_and
_AAAA
_records
(IPv4_ADDR
, IPv6_ADDR
, 10, 9, aging
=True)
2308 def test_A_9_days_AAAA_10_days_no_aging(self
):
2309 self
._test
_A
_and
_AAAA
_records
(IPv4_ADDR
, IPv6_ADDR
, 9, 10, aging
=False)
2311 def test_A_20_days_AAAA_2_days_aging(self
):
2312 self
._test
_A
_and
_AAAA
_records
(IPv4_ADDR
, IPv6_ADDR
, 20, 2, aging
=True)
2314 def test_A_6_days_AAAA_40_days_no_aging(self
):
2315 self
._test
_A
_and
_AAAA
_records
(IPv4_ADDR
, IPv6_ADDR
, 6, 40, aging
=False)
2317 def test_A_5_days_A_5_days_aging(self
):
2318 self
._test
_A
_and
_AAAA
_records
(IPv4_ADDR
, IPv4_ADDR_2
, 5, 5, aging
=True)
2320 def test_A_5_days_A_10_days_no_aging(self
):
2321 self
._test
_A
_and
_AAAA
_records
(IPv4_ADDR
, IPv4_ADDR_2
, 5, 10, aging
=False)
2323 def test_AAAA_5_days_AAAA_6_days_aging(self
):
2324 self
._test
_A
_and
_AAAA
_records
(IPv6_ADDR
, IPv6_ADDR_2
, 5, 6, aging
=True)
2326 def test_AAAA_5_days_AAAA_6_days_no_aging(self
):
2327 self
._test
_A
_and
_AAAA
_records
(IPv6_ADDR
, IPv6_ADDR_2
, 5, 6, aging
=False)
2329 def _test_multi_records_delete(self
, aging
):
2330 # Batch deleting a type doesn't update other types timestamps.
2331 self
.set_aging(aging
)
2334 now
= dsdb_dns
.unix_to_dns_timestamp(int(time
.time()))
2336 back_5_days
= now
- 5 * 24
2337 back_10_days
= now
- 10 * 24
2338 back_25_days
= now
- 25 * 24
2342 '2.2.2.2': back_5_days
,
2343 '3.3.3.3': back_10_days
,
2348 '::3': back_25_days
,
2357 # For windows, if we don't DNS update something, it won't know
2359 self
.dns_update_record(name
, '3')
2361 for k
, v
in ip4s
.items():
2362 r
= self
.add_ip_record(name
, k
, wtype
=dns
.DNS_QTYPE_A
, dwTimeStamp
=v
)
2364 for k
, v
in ip6s
.items():
2365 r
= self
.add_ip_record(name
, k
, wtype
=dns
.DNS_QTYPE_AAAA
, dwTimeStamp
=v
)
2367 for k
, v
in txts
.items():
2368 r
= self
.ldap_update_record(name
, k
, dwTimeStamp
=v
)
2370 self
.dns_delete_type(name
, dnsp
.DNS_TYPE_A
)
2372 r
= self
.dns_query(name
, dns
.DNS_QTYPE_A
)
2373 self
.assertEqual(r
.ancount
, 0)
2375 r
= self
.dns_query(name
, dns
.DNS_QTYPE_TXT
)
2376 self
.assertEqual(r
.ancount
, 3)
2377 rset
= set(x
.rdata
.txt
.str[0] for x
in r
.answers
)
2378 self
.assertEqual(rset
, set(txts
))
2380 r
= self
.dns_query(name
, dns
.DNS_QTYPE_AAAA
)
2381 self
.assertEqual(r
.ancount
, 3)
2382 rset
= set(ipv6_normalise(x
.rdata
) for x
in r
.answers
)
2383 self
.assertEqual(rset
, set(ip6s
))
2385 recs
= self
.ldap_get_records(name
)
2386 self
.assertEqual(len(recs
), 6)
2388 if r
.wType
== dns
.DNS_QTYPE_AAAA
:
2389 k
= ipv6_normalise(r
.data
)
2391 elif r
.wType
== dns
.DNS_QTYPE_TXT
:
2395 self
.fail(f
"unexpected wType {r.wType}")
2397 self
.assert_timestamps_equal(r
.dwTimeStamp
, expected
)
2399 def test_multi_records_delete_aging(self
):
2400 self
._test
_multi
_records
_delete
(True)
2402 def test_multi_records_delete_no_aging(self
):
2403 self
._test
_multi
_records
_delete
(False)
2405 def _test_dns_delete_times(self
, n_days
, aging
=True):
2406 # In these tests, Windows replaces the records with
2407 # tombstones, while Samba just removes them. Both are
2408 # reasonable approaches (there is no reanimation pathway for
2409 # tombstones), but this means self.ldap_get_records() gets
2410 # different numbers for each. So we use
2411 # self.ldap_get_non_tombstoned_record().
2417 self
.set_aging(aging
)
2418 now
= dsdb_dns
.unix_to_dns_timestamp(int(time
.time()))
2419 n_days_ago
= max(now
- n_days
* 24, 0)
2421 self
.dns_update_record(name
, A
)
2422 self
.ldap_update_record(name
, A
, dwTimeStamp
=n_days_ago
)
2423 self
.ldap_update_record(name
, B
, dwTimeStamp
=n_days_ago
)
2424 self
.ldap_update_record(name
, C
, dwTimeStamp
=n_days_ago
)
2425 self
.dns_update_record(name
, D
)
2426 r
= self
.dns_query(name
, dns
.DNS_QTYPE_TXT
)
2427 rset
= set(x
.rdata
.txt
.str[0] for x
in r
.answers
)
2428 self
.assertEqual(rset
, set('ABCD'))
2430 atime
= self
.get_unique_txt_record(name
, A
).dwTimeStamp
2431 btime
= self
.get_unique_txt_record(name
, B
).dwTimeStamp
2432 ctime
= self
.get_unique_txt_record(name
, C
).dwTimeStamp
2433 dtime
= self
.get_unique_txt_record(name
, D
).dwTimeStamp
2434 recs
= self
.ldap_get_records(name
)
2435 self
.assertEqual(len(recs
), 4)
2436 r
= self
.dns_query(name
, dns
.DNS_QTYPE_TXT
)
2437 rset
= set(x
.rdata
.txt
.str[0] for x
in r
.answers
)
2438 self
.assertEqual(rset
, set('ABCD'))
2440 self
.assert_timestamps_equal(dtime
, self
.get_unique_txt_record(name
, D
))
2442 self
.dns_delete(name
, D
)
2443 self
.assert_timestamps_equal(atime
, self
.get_unique_txt_record(name
, A
))
2444 self
.assert_timestamps_equal(btime
, self
.get_unique_txt_record(name
, B
))
2445 self
.assert_timestamps_equal(ctime
, self
.get_unique_txt_record(name
, C
))
2446 recs
= self
.ldap_get_non_tombstoned_records(name
)
2447 self
.assertEqual(len(recs
), 3)
2448 r
= self
.dns_query(name
, dns
.DNS_QTYPE_TXT
)
2449 rset
= set(x
.rdata
.txt
.str[0] for x
in r
.answers
)
2450 self
.assertEqual(rset
, set('ABC'))
2452 self
.rpc_delete_txt(name
, C
)
2453 self
.assert_timestamps_equal(atime
, self
.get_unique_txt_record(name
, A
))
2454 self
.assert_timestamps_equal(btime
, self
.get_unique_txt_record(name
, B
))
2455 recs
= self
.ldap_get_non_tombstoned_records(name
)
2456 self
.assertEqual(len(recs
), 2)
2457 r
= self
.dns_query(name
, dns
.DNS_QTYPE_TXT
)
2458 rset
= set(x
.rdata
.txt
.str[0] for x
in r
.answers
)
2459 self
.assertEqual(rset
, set('AB'))
2461 self
.dns_delete(name
, A
)
2462 self
.assert_timestamps_equal(btime
, self
.get_unique_txt_record(name
, B
))
2463 recs
= self
.ldap_get_records(name
)
2464 self
.assertEqual(len(recs
), 1)
2465 r
= self
.dns_query(name
, dns
.DNS_QTYPE_TXT
)
2466 rset
= set(x
.rdata
.txt
.str[0] for x
in r
.answers
)
2467 self
.assertEqual(rset
, {'B'})
2469 self
.dns_delete(name
, B
)
2470 recs
= self
.ldap_get_non_tombstoned_records(name
)
2471 # Windows leaves the node with zero records. Samba ends up
2473 self
.assertEqual(len(recs
), 0)
2474 r
= self
.dns_query(name
, dns
.DNS_QTYPE_TXT
)
2475 rset
= set(x
.rdata
.txt
.str[0] for x
in r
.answers
)
2476 self
.assertEqual(len(rset
), 0)
2478 def test_dns_delete_times_5_days_aging(self
):
2479 self
._test
_dns
_delete
_times
(5, True)
2481 def test_dns_delete_times_11_days_aging(self
):
2482 self
._test
_dns
_delete
_times
(11, True)
2484 def test_dns_delete_times_366_days_aging(self
):
2485 self
._test
_dns
_delete
_times
(366, True)
2487 def test_dns_delete_times_static_aging(self
):
2488 self
._test
_dns
_delete
_times
(1e10
, True)
2490 def test_dns_delete_times_5_days_no_aging(self
):
2491 self
._test
_dns
_delete
_times
(5, False)
2493 def test_dns_delete_times_11_days_no_aging(self
):
2494 self
._test
_dns
_delete
_times
(11, False)
2496 def test_dns_delete_times_366_days_no_aging(self
):
2497 self
._test
_dns
_delete
_times
(366, False)
2499 def test_dns_delete_times_static_no_aging(self
):
2500 self
._test
_dns
_delete
_times
(1e10
, False)
2502 def _test_dns_delete_simple(self
, a_days
, b_days
, aging
=True, touch
=False):
2503 # Here we show that with aging enabled, the timestamp of
2504 # sibling records is *not* modified when a record is deleted.
2506 # With aging disabled, it *is* modified, if the dns server has
2507 # seen it updated before ldap set the time (that is, probably
2508 # the dns server overwrites AD). This happens even if AD
2509 # thinks the record is static.
2513 self
.set_aging(aging
)
2514 now
= dsdb_dns
.unix_to_dns_timestamp(int(time
.time()))
2515 a_days_ago
= max(now
- a_days
* 24, 0)
2516 b_days_ago
= max(now
- b_days
* 24, 0)
2519 self
.dns_update_record(name
, A
)
2520 self
.dns_update_record(name
, B
)
2522 self
.ldap_update_record(name
, A
, dwTimeStamp
=a_days_ago
)
2523 self
.ldap_update_record(name
, B
, dwTimeStamp
=b_days_ago
)
2525 atime
= self
.get_unique_txt_record(name
, A
).dwTimeStamp
2527 self
.dns_delete(name
, B
)
2528 if not aging
and touch
:
2529 # this resets the timestamp even if it is a static record.
2530 self
.assert_soon_after(self
.get_unique_txt_record(name
, A
), now
)
2532 self
.assert_timestamps_equal(self
.get_unique_txt_record(name
, A
), atime
)
2534 def test_dns_delete_simple_2_3_days_aging(self
):
2535 self
._test
_dns
_delete
_simple
(2, 3, True)
2537 def test_dns_delete_simple_2_3_days_no_aging(self
):
2538 self
._test
_dns
_delete
_simple
(2, 3, False)
2540 def test_dns_delete_simple_2_13_days_aging(self
):
2541 self
._test
_dns
_delete
_simple
(2, 13, True)
2543 def test_dns_delete_simple_2_13_days_no_aging(self
):
2544 self
._test
_dns
_delete
_simple
(2, 13, False)
2546 def test_dns_delete_simple_12_13_days_aging(self
):
2547 self
._test
_dns
_delete
_simple
(12, 13, True)
2549 def test_dns_delete_simple_12_13_days_no_aging(self
):
2550 self
._test
_dns
_delete
_simple
(12, 13, False)
2552 def test_dns_delete_simple_112_113_days_aging(self
):
2553 self
._test
_dns
_delete
_simple
(112, 113, True)
2555 def test_dns_delete_simple_112_113_days_no_aging(self
):
2556 self
._test
_dns
_delete
_simple
(112, 113, False)
2558 def test_dns_delete_simple_0_113_days_aging(self
):
2559 # 1e9 hours ago evaluates to 0, i.e static
2560 self
._test
_dns
_delete
_simple
(1e9
, 113, True)
2562 def test_dns_delete_simple_0_113_days_no_aging(self
):
2563 self
._test
_dns
_delete
_simple
(1e9
, 113, False)
2565 def test_dns_delete_simple_0_0_days_aging(self
):
2566 self
._test
_dns
_delete
_simple
(1e9
, 1e9
, True)
2568 def test_dns_delete_simple_0_0_days_no_aging(self
):
2569 self
._test
_dns
_delete
_simple
(1e9
, 1e9
, False)
2571 def test_dns_delete_simple_10_0_days_aging(self
):
2572 self
._test
_dns
_delete
_simple
(10, 1e9
, True)
2574 def test_dns_delete_simple_10_0_days_no_aging(self
):
2575 self
._test
_dns
_delete
_simple
(10, 1e9
, False)
2577 def test_dns_delete_simple_2_3_days_aging_touch(self
):
2578 self
._test
_dns
_delete
_simple
(2, 3, True, True)
2580 def test_dns_delete_simple_2_3_days_no_aging_touch(self
):
2581 self
._test
_dns
_delete
_simple
(2, 3, False, True)
2583 def test_dns_delete_simple_2_13_days_aging_touch(self
):
2584 self
._test
_dns
_delete
_simple
(2, 13, True, True)
2586 def test_dns_delete_simple_2_13_days_no_aging_touch(self
):
2587 self
._test
_dns
_delete
_simple
(2, 13, False, True)
2589 def test_dns_delete_simple_12_13_days_aging_touch(self
):
2590 self
._test
_dns
_delete
_simple
(12, 13, True, True)
2592 def test_dns_delete_simple_12_13_days_no_aging_touch(self
):
2593 self
._test
_dns
_delete
_simple
(12, 13, False, True)
2595 def test_dns_delete_simple_112_113_days_aging_touch(self
):
2596 self
._test
_dns
_delete
_simple
(112, 113, True, True)
2598 def test_dns_delete_simple_112_113_days_no_aging_touch(self
):
2599 self
._test
_dns
_delete
_simple
(112, 113, False, True)
2601 def test_dns_delete_simple_0_113_days_aging_touch(self
):
2602 # 1e9 hours ago evaluates to 0, i.e static
2603 self
._test
_dns
_delete
_simple
(1e9
, 113, True, True)
2605 def test_dns_delete_simple_0_113_days_no_aging_touch(self
):
2606 self
._test
_dns
_delete
_simple
(1e9
, 113, False, True)
2608 def test_dns_delete_simple_0_0_days_aging_touch(self
):
2609 self
._test
_dns
_delete
_simple
(1e9
, 1e9
, True, True)
2611 def test_dns_delete_simple_0_0_days_no_aging_touch(self
):
2612 self
._test
_dns
_delete
_simple
(1e9
, 1e9
, False, True)
2614 def test_dns_delete_simple_10_0_days_aging_touch(self
):
2615 self
._test
_dns
_delete
_simple
(10, 1e9
, True, True)
2617 def test_dns_delete_simple_10_0_days_no_aging_touch(self
):
2618 self
._test
_dns
_delete
_simple
(10, 1e9
, False, True)
2620 def windows_variation(self
, fn
, *args
, msg
=None, **kwargs
):
2623 except AssertionError as e
:
2624 print("Expected success on Windows only, failed as expected:\n" +
2627 print(c_RED("known Windows failure"))
2629 print(c_DARK_YELLOW(msg
))
2630 print("Expected success on Windows:\n" +
2631 c_GREEN(f
"{fn.__name__} {args} {kwargs}"))
2633 def _test_dns_add_sibling(self
, a_days
, refresh
, aging
=True, touch
=False):
2634 # Here we show that with aging enabled, the timestamp of
2635 # sibling records *is* modified when a record is added.
2637 # With aging disabled, it *is* modified, if the dns server has
2638 # seen it updated before ldap set the time (that is, probably
2639 # the dns server overwrites AD). This happens even if AD
2640 # thinks the record is static.
2644 self
.set_zone_int_params(RefreshInterval
=int(refresh
),
2645 NoRefreshInterval
=7,
2648 now
= dsdb_dns
.unix_to_dns_timestamp(int(time
.time()))
2649 a_days_ago
= max(now
- a_days
* 24, 0)
2652 self
.dns_update_record(name
, A
)
2654 self
.ldap_update_record(name
, A
, dwTimeStamp
=a_days_ago
)
2656 atime
= self
.get_unique_txt_record(name
, A
).dwTimeStamp
2658 self
.dns_update_record(name
, B
)
2659 a_rec
= self
.get_unique_txt_record(name
, A
)
2660 if not aging
and touch
:
2661 # On Windows, this resets the timestamp even if it is a
2662 # static record, though in that case it may be a
2663 # transitory effect of the DNS cache. We will insist on
2664 # the Samba behaviour of not changing (that is
2665 # un-static-ing) a zero timestamp, because that is the
2668 self
.windows_variation(
2669 self
.assert_soon_after
, a_rec
, now
,
2670 msg
="Windows resets static siblings (cache effect?)")
2671 self
.assert_timestamps_equal(a_rec
, 0)
2673 self
.assert_soon_after(a_rec
, now
)
2675 self
.assert_timestamps_equal(a_rec
, atime
)
2677 b_rec
= self
.get_unique_txt_record(name
, B
)
2678 self
.assert_soon_after(b_rec
, now
)
2680 def test_dns_add_sibling_2_7_days_aging(self
):
2681 self
._test
_dns
_add
_sibling
(2, 7, True)
2683 def test_dns_add_sibling_2_7_days_no_aging(self
):
2684 self
._test
_dns
_add
_sibling
(2, 7, False)
2686 def test_dns_add_sibling_12_7_days_aging(self
):
2687 self
._test
_dns
_add
_sibling
(12, 7, True)
2689 def test_dns_add_sibling_12_7_days_no_aging(self
):
2690 self
._test
_dns
_add
_sibling
(12, 7, False)
2692 def test_dns_add_sibling_12_3_days_aging(self
):
2693 self
._test
_dns
_add
_sibling
(12, 3, True)
2695 def test_dns_add_sibling_12_3_days_no_aging(self
):
2696 self
._test
_dns
_add
_sibling
(12, 3, False)
2698 def test_dns_add_sibling_112_7_days_aging(self
):
2699 self
._test
_dns
_add
_sibling
(112, 7, True)
2701 def test_dns_add_sibling_112_7_days_no_aging(self
):
2702 self
._test
_dns
_add
_sibling
(112, 7, False)
2704 def test_dns_add_sibling_12_113_days_aging(self
):
2705 self
._test
_dns
_add
_sibling
(12, 113, True)
2707 def test_dns_add_sibling_12_113_days_no_aging(self
):
2708 self
._test
_dns
_add
_sibling
(12, 113, False)
2710 def test_dns_add_sibling_0_7_days_aging(self
):
2711 # 1e9 days ago evaluates to 0, i.e static
2712 self
._test
_dns
_add
_sibling
(1e9
, 7, True)
2714 def test_dns_add_sibling_0_7_days_no_aging(self
):
2715 self
._test
_dns
_add
_sibling
(1e9
, 7, False)
2717 def test_dns_add_sibling_0_0_days_aging(self
):
2718 self
._test
_dns
_add
_sibling
(1e9
, 0, True)
2720 def test_dns_add_sibling_0_0_days_no_aging(self
):
2721 self
._test
_dns
_add
_sibling
(1e9
, 0, False)
2723 def test_dns_add_sibling_10_0_days_aging(self
):
2724 self
._test
_dns
_add
_sibling
(10, 0, True)
2726 def test_dns_add_sibling_10_0_days_no_aging(self
):
2727 self
._test
_dns
_add
_sibling
(10, 0, False)
2729 def test_dns_add_sibling_2_7_days_aging_touch(self
):
2730 self
._test
_dns
_add
_sibling
(2, 7, True, True)
2732 def test_dns_add_sibling_2_7_days_no_aging_touch(self
):
2733 self
._test
_dns
_add
_sibling
(2, 7, False, True)
2735 def test_dns_add_sibling_12_7_days_aging_touch(self
):
2736 self
._test
_dns
_add
_sibling
(12, 7, True, True)
2738 def test_dns_add_sibling_12_7_days_no_aging_touch(self
):
2739 self
._test
_dns
_add
_sibling
(12, 7, False, True)
2741 def test_dns_add_sibling_12_3_days_aging_touch(self
):
2742 self
._test
_dns
_add
_sibling
(12, 3, True, True)
2744 def test_dns_add_sibling_12_3_days_no_aging_touch(self
):
2745 self
._test
_dns
_add
_sibling
(12, 3, False, True)
2747 def test_dns_add_sibling_112_7_days_aging_touch(self
):
2748 self
._test
_dns
_add
_sibling
(112, 7, True, True)
2750 def test_dns_add_sibling_112_7_days_no_aging_touch(self
):
2751 self
._test
_dns
_add
_sibling
(112, 7, False, True)
2753 def test_dns_add_sibling_12_113_days_aging_touch(self
):
2754 self
._test
_dns
_add
_sibling
(12, 113, True, True)
2756 def test_dns_add_sibling_12_113_days_no_aging_touch(self
):
2757 self
._test
_dns
_add
_sibling
(12, 113, False, True)
2759 def test_dns_add_sibling_0_7_days_aging_touch(self
):
2760 self
._test
_dns
_add
_sibling
(1e9
, 7, True, True)
2762 def test_dns_add_sibling_0_7_days_no_aging_touch(self
):
2763 self
._test
_dns
_add
_sibling
(1e9
, 7, False, True)
2765 def test_dns_add_sibling_0_0_days_aging_touch(self
):
2766 self
._test
_dns
_add
_sibling
(1e9
, 0, True, True)
2768 def test_dns_add_sibling_0_0_days_no_aging_touch(self
):
2769 self
._test
_dns
_add
_sibling
(1e9
, 0, False, True)
2771 def test_dns_add_sibling_10_0_days_aging_touch(self
):
2772 self
._test
_dns
_add
_sibling
(10, 0, True, True)
2774 def test_dns_add_sibling_10_0_days_no_aging_touch(self
):
2775 self
._test
_dns
_add
_sibling
(10, 0, False, True)
2777 TestProgram(module
=__name__
, opts
=subunitopts
)