ctdb-scripts: Improve update and listing code
[samba4-gss.git] / python / samba / tests / dns_aging.py
blob35d0e5c5380b9888cb532753e1777c1d0f2d639b
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
3 # Copyright (C) Catalyst.NET 2021
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import sys
20 from samba import dsdb
21 from samba import dsdb_dns
22 from samba.ndr import ndr_unpack, ndr_pack
23 from samba.samdb import SamDB
24 from samba.auth import system_session
25 import ldb
26 from samba import credentials
27 from samba.dcerpc import dns, dnsp, dnsserver
28 from samba.dnsserver import TXTRecord, ARecord
29 from samba.dnsserver import ipv6_normalise
30 from samba.tests.subunitrun import SubunitOptions, TestProgram
31 from samba import werror, WERRORError
32 from samba.tests.dns_base import DNSTest
33 import samba.getopt as options
34 import optparse
35 import time
36 from samba.colour import c_RED, c_GREEN, c_DARK_YELLOW
38 parser = optparse.OptionParser(
39 "dns_aging.py <server name> <server ip> [options]")
40 sambaopts = options.SambaOptions(parser)
41 parser.add_option_group(sambaopts)
44 # use command line creds if available
45 credopts = options.CredentialsOptions(parser)
46 parser.add_option_group(credopts)
47 subunitopts = SubunitOptions(parser)
48 parser.add_option_group(subunitopts)
50 opts, args = parser.parse_args()
51 if len(args) < 2:
52 parser.print_usage()
53 sys.exit(1)
55 LP = sambaopts.get_loadparm()
56 CREDS = credopts.get_credentials(LP)
57 SERVER_NAME = args[0]
58 SERVER_IP = args[1]
59 CREDS.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
61 DOMAIN = CREDS.get_realm().lower()
63 # Unix time start, in DNS timestamp (24 * 365.25 * 369)
64 # These are ballpark extremes for the timestamp.
65 DNS_TIMESTAMP_1970 = 3234654
66 DNS_TIMESTAMP_2101 = 4383000
67 DNS_TIMESTAMP_1981 = 3333333 # a middling timestamp
69 IPv4_ADDR = "127.0.0.33"
70 IPv6_ADDR = "::1"
71 IPv4_ADDR_2 = "127.0.0.66"
72 IPv6_ADDR_2 = "1::1"
75 def get_samdb():
76 return SamDB(url=f"ldap://{SERVER_IP}",
77 lp=LP,
78 session_info=system_session(),
79 credentials=CREDS)
82 def get_file_samdb():
83 # For Samba only direct file access, needed for the tombstoning functions.
84 # (For Windows, we instruct it to tombstone over RPC).
85 return SamDB(url=LP.samdb_url(),
86 lp=LP,
87 session_info=system_session(),
88 credentials=CREDS)
91 def get_rpc():
92 return dnsserver.dnsserver(f"ncacn_ip_tcp:{SERVER_IP}[sign]", LP, CREDS)
95 def create_zone(name, rpc=None, aging=True):
96 if rpc is None:
97 rpc = get_rpc()
98 z = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
99 z.pszZoneName = name
100 z.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
101 z.fAging = int(bool(aging))
102 z.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
103 z.fDsIntegrated = 1
104 z.fLoadExisting = 1
105 z.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE
106 rpc.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
108 SERVER_IP,
109 None,
111 'ZoneCreate',
112 dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
116 def delete_zone(name, rpc=None):
117 if rpc is None:
118 rpc = get_rpc()
119 rpc.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
121 SERVER_IP,
122 name,
124 'DeleteZoneFromDs',
125 dnsserver.DNSSRV_TYPEID_NULL,
126 None)
129 def txt_s_list(txt):
130 """Construct a txt record string list, which is a fiddly matter."""
131 if isinstance(txt, str):
132 txt = [txt]
133 s_list = dnsp.string_list()
134 s_list.count = len(txt)
135 s_list.str = txt
136 return s_list
139 def make_txt_record(txt):
140 r = dns.txt_record()
141 r.txt = txt_s_list(txt)
142 return r
145 def copy_rec(rec):
146 copy = dnsserver.DNS_RPC_RECORD()
147 copy.wType = rec.wType
148 copy.dwFlags = rec.dwFlags
149 copy.dwSerial = rec.dwSerial
150 copy.dwTtlSeconds = rec.dwTtlSeconds
151 copy.data = rec.data
152 copy.dwTimeStamp = rec.dwTimeStamp
153 return copy
156 def guess_wtype(data):
157 if isinstance(data, list):
158 data = make_txt_record(data)
159 return (data, dnsp.DNS_TYPE_TXT)
160 if ":" in data:
161 return (data, dnsp.DNS_TYPE_AAAA)
162 return (data, dnsp.DNS_TYPE_A)
165 class TestDNSAging(DNSTest):
166 """Probe DNS aging and scavenging, using LDAP and RPC to set and test
167 the timestamps behind DNS's back."""
168 server = SERVER_NAME
169 server_ip = SERVER_IP
170 creds = CREDS
172 def setUp(self):
173 super().setUp()
174 self.rpc_conn = get_rpc()
175 self.samdb = get_samdb()
177 # We always have a zone of our own named after the test function.
178 self.zone = self.id().rsplit('.', 1)[1]
179 self.addCleanup(delete_zone, self.zone, self.rpc_conn)
180 try:
181 create_zone(self.zone, self.rpc_conn)
182 except WERRORError as e:
183 if e.args[0] != werror.WERR_DNS_ERROR_ZONE_ALREADY_EXISTS:
184 raise
185 print(f"zone {self.zone} already exists")
187 # Though we set this in create_zone(), that doesn't work on
188 # Windows, so we repeat again here.
189 self.set_zone_int_params(AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
191 self.zone_dn = (f"DC={self.zone},CN=MicrosoftDNS,DC=DomainDNSZones,"
192 f"{self.samdb.get_default_basedn()}")
194 def set_zone_int_params(self, zone=None, **kwargs):
195 """Keyword arguments set parameters on the zone. e.g.:
197 self.set_zone_int_params(Aging=1,
198 RefreshInterval=222)
200 See [MS-DNSP] 3.1.1.2.1 "DNS Zone Integer Properties" for names.
202 if zone is None:
203 zone = self.zone
204 for key, val in kwargs.items():
205 name_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
206 name_param.dwParam = val
207 name_param.pszNodeName = key
208 try:
209 self.rpc_conn.DnssrvOperation2(
210 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
212 SERVER_IP,
213 zone,
215 'ResetDwordProperty',
216 dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM,
217 name_param)
218 except WERRORError as e:
219 self.fail(str(e))
221 def rpc_replace(self, name, old=None, new=None):
222 """Replace a DNS_RPC_RECORD or DNS_RPC_RECORD_BUF"""
223 # wrap our recs, if necessary
224 if isinstance(new, dnsserver.DNS_RPC_RECORD):
225 rec = new
226 new = dnsserver.DNS_RPC_RECORD_BUF()
227 new.rec = rec
229 if isinstance(old, dnsserver.DNS_RPC_RECORD):
230 rec = old
231 old = dnsserver.DNS_RPC_RECORD_BUF()
232 old.rec = rec
234 try:
235 self.rpc_conn.DnssrvUpdateRecord2(
236 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
238 SERVER_IP,
239 self.zone,
240 name,
241 new,
242 old)
243 except WERRORError as e:
244 self.fail(f"could not replace record ({e})")
246 def get_unique_txt_record(self, name, txt):
247 """Get the TXT record on Name with value txt, asserting that there is
248 only one."""
249 if isinstance(txt, str):
250 txt = [txt]
251 recs = self.ldap_get_records(name)
253 match = None
254 for r in recs:
255 if r.wType != dnsp.DNS_TYPE_TXT:
256 continue
257 txt2 = [x for x in r.data.str]
258 if txt2 == txt:
259 self.assertIsNone(match)
260 match = r
261 return match
263 def get_unique_ip_record(self, name, addr, wtype=None):
264 """Get an A or AAAA record on name with the matching data."""
265 if wtype is None:
266 addr, wtype = guess_wtype(addr)
268 recs = self.ldap_get_records(name)
270 # We need to use the internal dns_record_match because not all
271 # forms always match on strings (e.g. IPv6)
272 rec = dnsp.DnssrvRpcRecord()
273 rec.wType = wtype
274 rec.data = addr
276 match = None
277 for r in recs:
278 if dsdb_dns.records_match(r, rec):
279 self.assertIsNone(match)
280 match = r
281 return match
283 def dns_query(self, name, qtype=dns.DNS_QTYPE_ALL):
284 """make a query, which might help Windows notice LDAP changes"""
285 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
286 fullname = "%s.%s" % (name, self.zone)
287 q = self.make_name_question(fullname, qtype, dns.DNS_QCLASS_IN)
288 self.finish_name_packet(p, [q])
289 r, rp = self.dns_transaction_udp(p, host=SERVER_IP)
291 return r
293 def dns_update_non_text(self, name,
294 data,
295 wtype=None,
296 qclass=dns.DNS_QCLASS_IN):
297 if wtype is None:
298 data, wtype = guess_wtype(data)
300 if qclass == dns.DNS_QCLASS_IN:
301 ttl = 123
302 else:
303 ttl = 0
305 fullname = "%s.%s" % (name, self.zone)
306 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
307 u = self.make_name_question(self.zone,
308 dns.DNS_QTYPE_SOA,
309 dns.DNS_QCLASS_IN)
310 self.finish_name_packet(p, [u])
312 r = dns.res_rec()
313 r.name = fullname
314 r.rr_type = wtype
315 r.rr_class = qclass
316 r.ttl = ttl
317 if data is not None:
318 r.length = 0xffff
319 r.rdata = data
320 else:
321 r.length = 0
323 p.nscount = 1
324 p.nsrecs = [r]
326 (code, response) = self.dns_transaction_udp(p, host=SERVER_IP)
327 self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
328 return response
330 def dns_delete(self, name, data, wtype=None):
331 return self.dns_update_non_text(name,
332 data,
333 wtype,
334 qclass=dns.DNS_QCLASS_NONE)
336 def dns_delete_type(self, name, wtype):
337 return self.dns_update_non_text(name,
338 None,
339 wtype,
340 qclass=dns.DNS_QCLASS_ANY)
342 def dns_update_record(self, name, txt, ttl=900):
343 if isinstance(txt, str):
344 txt = [txt]
345 p = self.make_txt_update(name, txt, self.zone, ttl=ttl)
346 (code, response) = self.dns_transaction_udp(p, host=SERVER_IP)
347 if code.operation & dns.DNS_RCODE == dns.DNS_RCODE_REFUSED:
348 # sometimes you might forget this
349 print("\n\ngot DNS_RCODE_REFUSED\n")
350 print("Are you running this in the fl2003 environment?\n")
351 print("try `SELFTEST_TESTENV='fl2003dc:local' make testenv`\n\n")
353 self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
354 return self.get_unique_txt_record(name, txt)
356 def rpc_update_record(self, name, txt, **kwargs):
357 """Add the record that self.dns_update_record() would add, via the
358 dnsserver RPC pipe.
360 As with DNS update, if the record already exists, we replace it.
362 if isinstance(txt, str):
363 txt = [txt]
365 old = TXTRecord(txt)
366 rec = TXTRecord(txt)
367 for k, v in kwargs.items():
368 setattr(rec, k, v)
370 try:
371 self.rpc_replace(name, old, rec)
372 except AssertionError as e:
373 # we have caught and wrapped the WERRor inside
374 if 'WERR_DNS_ERROR_RECORD_DOES_NOT_EXIST' not in str(e):
375 raise
376 self.rpc_replace(name, None, rec)
378 return self.get_unique_txt_record(name, txt)
380 def rpc_delete_txt(self, name, txt):
381 if isinstance(txt, str):
382 txt = [txt]
383 old = TXTRecord(txt)
384 self.rpc_replace(name, old, None)
386 def get_one_node(self, name):
387 self.assertIsInstance(name, str)
388 expr = f"(&(objectClass=dnsNode)(name={name}))"
389 nodes = self.samdb.search(base=self.zone_dn,
390 scope=ldb.SCOPE_SUBTREE,
391 expression=expr,
392 attrs=["dnsRecord", "dNSTombstoned", "name"])
394 if len(nodes) > 1:
395 self.fail(
396 f"expected 0 or 1 dnsNodes for {name}, found {len(nodes)}")
398 if len(nodes) == 0:
399 return None
400 return nodes[0]
402 def ldap_get_records(self, name):
403 node = self.get_one_node(name)
404 if node is None:
405 return []
407 records = node.get('dnsRecord')
408 return [ndr_unpack(dnsp.DnssrvRpcRecord, r) for r in records]
410 def ldap_get_non_tombstoned_records(self, name):
411 all_records = self.ldap_get_records(name)
412 records = []
413 for r in all_records:
414 if r.wType != dnsp.DNS_TYPE_TOMBSTONE:
415 records.append(r)
416 return records
418 def assert_tombstoned(self, name, tombstoned=True, timestamp=None):
419 # If run with tombstoned=False, assert it isn't tombstoned
420 # (and has no traces of tombstone). Otherwise assert it has
421 # all the necessary bits.
423 # with timestamp=<non-zero number of hours>, we assert that
424 # the nttime timestamp is about that time.
426 # with timestamp=None, we assert it is within a century or so.
428 # with timestamp=False (or 0), we don't assert on it.
430 node = self.get_one_node(name)
431 if node is None:
432 self.fail(f"no node named {name}")
434 dnsts = node.get("dNSTombstoned")
435 if dnsts is None:
436 is_tombstoned = False
437 else:
438 self.assertEqual(len(dnsts), 1)
439 if dnsts[0] == b'TRUE':
440 is_tombstoned = True
441 else:
442 is_tombstoned = False
444 if tombstoned != is_tombstoned:
445 if is_tombstoned:
446 self.fail(f"{name} is tombstoned")
447 else:
448 self.fail(f"{name} is not tombstoned")
450 recs = self.ldap_get_records(name)
451 if is_tombstoned:
452 self.assertEqual(len(recs), 1)
453 self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
454 if timestamp is None:
455 self.assert_nttime_in_hour_range(recs[0].data)
456 elif timestamp:
457 self.assert_nttime_in_hour_range(recs[0].data,
458 timestamp - 3,
459 timestamp + 3)
461 else:
462 for r in recs:
463 self.assertNotEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
465 def ldap_replace_records(self, name, records):
466 # We use raw ldap to avoid the "helpfulness" of dsdb_dns.replace()
468 dn = f'DC={name},{self.zone_dn}'
470 msg = ldb.Message.from_dict(self.samdb,
471 {'dn': dn,
472 'dnsRecord': [ndr_pack(r) for r in records]
474 ldb.FLAG_MOD_REPLACE)
476 try:
477 self.samdb.modify(msg)
478 except ldb.LdbError as e:
479 if 'LDAP_NO_SUCH_OBJECT' not in e.args[1]:
480 raise
481 # We need to do an add
482 msg["objectClass"] = ["top", "dnsNode"]
483 msg["dnsRecord"].set_flags(ldb.FLAG_MOD_ADD)
484 self.samdb.add(msg)
486 def ldap_update_core(self, name, wtype, data, **kwargs):
487 """This one is not TXT specific."""
488 records = self.ldap_get_records(name)
490 # default values
491 rec = dnsp.DnssrvRpcRecord()
492 rec.wType = wtype
493 rec.rank = dnsp.DNS_RANK_ZONE
494 rec.dwTtlSeconds = 900
495 rec.dwSerial = 110
496 rec.dwTimeStamp = 0
497 rec.data = data
499 # override defaults, as required
500 for k, v in kwargs.items():
501 setattr(rec, k, v)
503 for i, r in enumerate(records[:]):
504 if dsdb_dns.records_match(r, rec):
505 records[i] = rec
506 break
507 else: # record not found
508 records.append(rec)
510 self.ldap_replace_records(name, records)
511 return rec
513 def ldap_update_record(self, name, txt, **kwargs):
514 """Add the record that self.dns_update_record() would add, via ldap,
515 thus allowing us to set additional dnsRecord features like
516 dwTimestamp.
518 rec = self.ldap_update_core(name,
519 dnsp.DNS_TYPE_TXT,
520 txt_s_list(txt),
521 **kwargs)
523 recs = self.ldap_get_records(name)
524 match = None
525 for r in recs:
526 if r.wType != rec.wType:
527 continue
528 if r.data.str == rec.data.str:
529 self.assertIsNone(match, f"duplicate records for {name}")
530 match = r
531 self.assertEqual(match.rank, rec.rank & 255)
532 self.assertEqual(match.dwTtlSeconds, rec.dwTtlSeconds)
533 self.assert_timestamps_equal(match.dwTimeStamp, rec.dwTimeStamp)
534 return match
536 def ldap_delete_record(self, name, data, wtype=dnsp.DNS_TYPE_TXT):
537 rec = dnsp.DnssrvRpcRecord()
538 if wtype == dnsp.DNS_TYPE_TXT:
539 data = txt_s_list(data)
541 rec.wType = wtype
542 rec.data = data
543 records = self.ldap_get_records(name)
544 for i, r in enumerate(records[:]):
545 if dsdb_dns.records_match(r, rec):
546 del records[i]
547 break
548 else:
549 self.fail(f"record {data} not found")
551 self.ldap_replace_records(name, records)
553 def add_ip_record(self, name, addr, wtype=None, **kwargs):
554 if wtype is None:
555 addr, wtype = guess_wtype(addr)
556 rec = self.ldap_update_core(name,
557 wtype,
558 addr,
559 **kwargs)
561 recs = self.ldap_get_records(name)
562 match = None
563 for r in recs:
564 if dsdb_dns.records_match(r, rec):
565 self.assertIsNone(match, f"duplicate records for {name}")
566 match = r
567 self.assertEqual(match.rank, rec.rank & 255)
568 self.assertEqual(match.dwTtlSeconds, rec.dwTtlSeconds)
569 self.assert_timestamps_equal(match.dwTimeStamp, rec.dwTimeStamp)
570 return match
572 def ldap_modify_timestamps(self, name, delta):
573 records = self.ldap_get_records(name)
574 for rec in records:
575 rec.dwTimeStamp += delta
576 self.ldap_replace_records(name, records)
578 def get_rpc_records(self, name, dns_type=None):
579 if dns_type is None:
580 dns_type = dnsp.DNS_TYPE_ALL
581 select_flags = dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA
582 buflen, res = self.rpc_conn.DnssrvEnumRecords2(
583 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
585 SERVER_IP,
586 self.zone,
587 name,
588 None,
589 dns_type,
590 select_flags,
591 None,
592 None)
593 recs = []
594 if not res or res.count == 0:
595 return []
596 for rec in res.rec:
597 recs.extend(rec.records)
598 return recs
600 def dns_tombstone(self, name,
601 epoch_hours=DNS_TIMESTAMP_1981,
602 epoch_nttime=None):
603 dn = f'DC={name},{self.zone_dn}'
604 r = dnsp.DnssrvRpcRecord()
605 r.wType = dnsp.DNS_TYPE_TOMBSTONE
606 # r.dwTimeStamp is a 32 bit value in hours, and r.data is an
607 # NTTIME (100 nanosecond intervals), both in the 1601 epoch. A
608 # tombstone will have both, but expiration calculations use
609 # the r.data NTTIME EntombedTime timestamp (see [MS-DNSP]).
610 r.dwTimeStamp = epoch_hours
611 if epoch_nttime is None:
612 r.data = epoch_hours * 3600 * 10 * 1000 * 1000
613 else:
614 r.data = epoch_nttime
616 msg = ldb.Message.from_dict(self.samdb,
617 {'dn': dn,
618 'dnsRecord': [ndr_pack(r)],
619 'dnsTombstoned': 'TRUE'
621 ldb.FLAG_MOD_REPLACE)
622 try:
623 self.samdb.modify(msg)
624 except ldb.LdbError as e:
625 if 'LDAP_NO_SUCH_OBJECT' not in e.args[1]:
626 raise
627 # We need to do an add
628 msg["objectClass"] = ["top", "dnsNode"]
629 self.samdb.add(msg)
631 def set_aging(self, enable=False):
632 self.set_zone_int_params(Aging=int(bool(enable)))
634 def assert_timestamp_in_ballpark(self, rec):
635 self.assertGreater(rec.dwTimeStamp, DNS_TIMESTAMP_1970)
636 self.assertLess(rec.dwTimeStamp, DNS_TIMESTAMP_2101)
638 def assert_nttime_in_hour_range(self, t,
639 hour_min=DNS_TIMESTAMP_1970,
640 hour_max=DNS_TIMESTAMP_2101):
641 t //= int(3600 * 1e7)
642 self.assertGreater(t, hour_min)
643 self.assertLess(t, hour_max)
645 def assert_soon_after(self, timestamp, reference):
646 """Assert that a timestamp is the same or very slightly higher than a
647 reference timestamp.
649 Typically we expect the timestamps to be identical, unless an
650 hour has clicked over since the reference was taken. However
651 we allow one more hour in case it happens during a daylight
652 savings transition or something.
654 if hasattr(timestamp, 'dwTimeStamp'):
655 timestamp = timestamp.dwTimeStamp
656 if hasattr(reference, 'dwTimeStamp'):
657 reference = reference.dwTimeStamp
659 diff = timestamp - reference
660 days = abs(diff / 24.0)
662 if diff < 0:
663 msg = f"timestamp is {days} days ({abs(diff)} hours) before reference"
664 elif diff > 2:
665 msg = f"timestamp is {days} days ({diff} hours) after reference"
666 else:
667 return
668 raise AssertionError(msg)
670 def assert_timestamps_equal(self, ts1, ts2):
671 """Just like assertEqual(), but tells us the difference, not the
672 absolute values. e.g:
674 self.assertEqual(a, b)
675 AssertionError: 3685491 != 3685371
677 self.assert_timestamps_equal(a, b)
678 AssertionError: -120 (first is 5.0 days earlier than second)
680 Also, we turn a record into a timestamp if we need
682 if hasattr(ts1, 'dwTimeStamp'):
683 ts1 = ts1.dwTimeStamp
684 if hasattr(ts2, 'dwTimeStamp'):
685 ts2 = ts2.dwTimeStamp
687 if ts1 == ts2:
688 return
690 diff = ts1 - ts2
691 days = abs(diff / 24.0)
692 if ts1 == 0 or ts2 == 0:
693 # when comparing to zero we don't want the number of days.
694 msg = f"timestamp {ts1} != {ts2}"
695 elif diff > 0:
696 msg = f"{ts1} is {days} days ({diff} hours) after {ts2}"
697 else:
698 msg = f"{ts1} is {days} days ({abs(diff)} hours) before {ts2}"
700 raise AssertionError(msg)
702 def test_update_timestamps_aging_off_then_on(self):
703 # we will add a record with aging off
704 # it will have the current timestamp
705 self.set_aging(False)
706 name = 'timestamp-now'
707 name2 = 'timestamp-eightdays'
709 rec = self.dns_update_record(name, [name])
710 start_time = rec.dwTimeStamp
711 self.assert_timestamp_in_ballpark(rec)
712 # alter the timestamp -8 days using RPC
713 # with aging turned off, we expect no change
714 # when aging is on, we expect change
715 eight_days_ago = start_time - 8 * 24
716 rec = self.ldap_update_record(name2, [name2],
717 dwTimeStamp=eight_days_ago)
719 self.assert_timestamps_equal(rec.dwTimeStamp, eight_days_ago)
721 # if aging was on, this would change
722 rec = self.dns_update_record(name2, [name2])
723 self.assert_timestamps_equal(rec.dwTimeStamp, eight_days_ago)
725 self.set_aging(True)
726 rec = self.dns_update_record(name2, [name2])
727 self.assertGreaterEqual(rec.dwTimeStamp, start_time)
729 def test_rpc_update_timestamps(self):
730 # RPC always sets timestamps to zero on Windows.
731 self.set_aging(False)
732 name = 'timestamp-now'
734 rec = self.dns_update_record(name, [name])
735 start_time = rec.dwTimeStamp
736 self.assert_timestamp_in_ballpark(rec)
737 # attempt to alter the timestamp to something close by.
738 eight_days_ago = start_time - 8 * 24
739 rec = self.rpc_update_record(name, [name],
740 dwTimeStamp=eight_days_ago)
741 self.assertEqual(rec.dwTimeStamp, 0)
743 # try again, with aging on
744 self.set_aging(True)
745 rec = self.rpc_update_record(name, [name],
746 dwTimeStamp=eight_days_ago)
747 self.assertEqual(rec.dwTimeStamp, 0)
749 # now that the record is static, a dns update won't change it
750 rec = self.dns_update_record(name, [name])
751 self.assertEqual(rec.dwTimeStamp, 0)
753 # but another record on the same node will behave normally
754 # i.e. the node is not static, the record is.
755 name2 = 'timestamp-eightdays'
756 rec = self.dns_update_record(name2, [name2])
757 self.assert_soon_after(rec.dwTimeStamp,
758 start_time)
760 def get_txt_timestamps(self, name, *txts):
761 records = self.ldap_get_records(name)
763 ret = []
764 for t in txts:
765 for r in records:
766 t2 = [x for x in r.data.str]
767 if t == t2:
768 ret.append(r.dwTimeStamp)
769 return ret
771 def test_update_aging_disabled_2(self):
772 # With aging disabled, Windows updates the timestamps of all
773 # records when one is updated.
774 name = 'test'
775 txt1 = ['test txt']
776 txt2 = ['test', 'txt2']
777 txt3 = ['test', 'txt3']
779 self.set_aging(False)
781 current_time = self.dns_update_record(name, txt1).dwTimeStamp
783 six_days_ago = current_time - 6 * 24
784 eight_days_ago = current_time - 8 * 24
785 fifteen_days_ago = current_time - 15 * 24
786 hundred_days_ago = current_time - 100 * 24
787 thousand_days_ago = current_time - 1000 * 24
789 for timestamp in (current_time,
790 six_days_ago,
791 eight_days_ago,
792 fifteen_days_ago,
793 hundred_days_ago,
794 thousand_days_ago):
795 # wind back
796 self.ldap_update_record(name, txt1, dwTimeStamp=timestamp)
797 self.assertEqual(self.get_txt_timestamps(name, txt1), [timestamp])
799 # no change here
800 update_timestamp = self.dns_update_record(name, txt1).dwTimeStamp
801 self.assert_timestamps_equal(update_timestamp, timestamp)
803 # adding a fresh record
804 for timestamp in (current_time,
805 six_days_ago,
806 eight_days_ago,
807 fifteen_days_ago,
808 hundred_days_ago,
809 thousand_days_ago,
810 100000,
811 100):
812 # wind back
813 timestamp1 = self.ldap_update_record(
814 name,
815 txt1,
816 dwTimeStamp=timestamp).dwTimeStamp
817 self.assert_timestamps_equal(timestamp1, timestamp)
819 self.dns_update_record(name, txt2)
820 timestamps = self.get_txt_timestamps(name, txt1, txt2)
821 self.assertEqual(timestamps, [timestamp, current_time])
823 self.ldap_delete_record(name, txt2)
824 timestamps = self.get_txt_timestamps(name, txt1)
825 self.assertEqual(timestamps, [timestamp])
827 # add record 2.
828 timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
829 self.assert_soon_after(timestamp2, current_time)
831 for timestamp in (current_time,
832 six_days_ago,
833 eight_days_ago,
834 fifteen_days_ago,
835 hundred_days_ago,
836 thousand_days_ago,
837 100000,
838 100):
839 # wind back
840 self.ldap_update_record(name, txt1, dwTimeStamp=timestamp)
841 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
842 self.assert_timestamps_equal(timestamp1, timestamp)
844 timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
845 # txt1 timestamp is now current time
846 timestamps = self.get_txt_timestamps(name, txt1, txt2)
847 self.assertEqual(timestamps, [timestamp, current_time])
849 # with 3 records, no change
850 for timestamp in (current_time,
851 six_days_ago,
852 eight_days_ago,
853 fifteen_days_ago,
854 hundred_days_ago,
855 thousand_days_ago,
856 100000,
857 10):
858 # wind back
859 self.ldap_update_record(name, txt1, dwTimeStamp=timestamp)
860 self.ldap_update_record(name, txt2, dwTimeStamp=timestamp)
861 self.ldap_update_record(name, txt3, dwTimeStamp=(timestamp + 30))
862 timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
863 self.assert_timestamps_equal(timestamp3, timestamp + 30)
865 self.dns_update_record(name, txt2).dwTimeStamp
866 timestamps = self.get_txt_timestamps(name, txt1, txt2, txt3)
867 self.assertEqual(timestamps, [timestamp,
868 timestamp,
869 timestamp + 30])
871 # with 3 records, one of which is static
872 # first we set the updatee's timestamp to a recognisable number
873 self.ldap_update_record(name, txt2, dwTimeStamp=999999)
874 for timestamp in (current_time,
875 six_days_ago,
876 eight_days_ago,
877 fifteen_days_ago,
878 hundred_days_ago,
879 thousand_days_ago,
880 100000,
881 10):
882 # wind back
883 self.ldap_update_record(name, txt1, dwTimeStamp=0)
884 self.ldap_update_record(name, txt3, dwTimeStamp=(timestamp - 9))
885 timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
886 self.assert_timestamps_equal(timestamp3, timestamp - 9)
888 self.dns_update_record(name, txt2)
889 timestamps = self.get_txt_timestamps(name, txt1, txt2, txt3)
890 self.assertEqual(timestamps, [0,
891 999999,
892 timestamp - 9])
894 # with 3 records, updating one which is static
895 timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
896 for timestamp in (current_time,
897 six_days_ago,
898 eight_days_ago,
899 fifteen_days_ago,
900 hundred_days_ago,
901 thousand_days_ago,
902 100000,
903 10):
904 # wind back
905 self.ldap_update_record(name, txt1, dwTimeStamp=0)
906 self.ldap_update_record(name, txt2, dwTimeStamp=0)
907 self.ldap_update_record(name, txt3, dwTimeStamp=(timestamp + 30))
908 timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
909 self.assert_timestamps_equal(timestamp3, timestamp + 30)
911 self.dns_update_record(name, txt2).dwTimeStamp
912 timestamps = self.get_txt_timestamps(name, txt1, txt2, txt3)
913 self.assertEqual(timestamps, [0,
915 timestamp + 30])
917 # with 3 records, after the static nodes have been replaced
918 self.ldap_update_record(name, txt1, dwTimeStamp=777777)
919 self.ldap_update_record(name, txt2, dwTimeStamp=888888)
920 timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
921 for timestamp in (current_time,
922 six_days_ago,
923 eight_days_ago,
924 fifteen_days_ago,
925 hundred_days_ago,
926 thousand_days_ago,
927 100000,
928 10):
929 # wind back
930 self.ldap_update_record(name, txt3, dwTimeStamp=(timestamp))
931 timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
932 self.assert_timestamps_equal(timestamp3, timestamp)
934 self.dns_update_record(name, txt2)
935 timestamps = self.get_txt_timestamps(name, txt1, txt2, txt3)
936 self.assertEqual(timestamps, [777777,
937 888888,
938 timestamp])
940 def _test_update_aging_disabled_n_days_ago(self, n_days):
941 name = 'test'
942 txt1 = ['1']
943 txt2 = ['2']
945 self.set_aging(False)
946 current_time = self.dns_update_record(name, txt1).dwTimeStamp
948 # rewind timestamp using ldap
949 self.ldap_modify_timestamps(name, n_days * -24)
950 n_days_ago = self.get_unique_txt_record(name, txt1).dwTimeStamp
951 self.assertGreater(current_time, n_days_ago)
953 # no change when updating this record
954 update_timestamp = self.dns_update_record(name, txt1).dwTimeStamp
955 self.assert_timestamps_equal(update_timestamp, n_days_ago)
957 # add another record, which should have the current timestamp
958 timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
959 self.assert_soon_after(timestamp2, current_time)
961 # get the original record timestamp. NOW it matches current_time
962 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
963 self.assert_timestamps_equal(timestamp1, timestamp2)
965 # let's repeat that, this time with txt2 existing
966 self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
968 timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
969 self.assert_timestamps_equal(timestamp1, n_days_ago)
971 # this update is not an add
972 timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
973 self.assert_soon_after(timestamp2, current_time)
975 # now timestamp1 is not changed
976 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
977 self.assert_timestamps_equal(timestamp1, n_days_ago)
979 # delete record2, try again
980 self.ldap_delete_record(name, txt2)
981 self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
983 timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
984 self.assert_timestamps_equal(timestamp1, n_days_ago)
986 # here we are re-adding the deleted record
987 timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
988 self.assert_soon_after(timestamp2, current_time)
990 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
992 # It gets weird HERE.
993 # note how the SIBLING of the deleted, re-added record differs
994 # from the sibling of freshly added record, depending on the
995 # time difference.
996 if n_days <= 7:
997 self.assert_timestamps_equal(timestamp1, n_days_ago)
998 else:
999 self.assert_timestamps_equal(timestamp1, timestamp2)
1001 # re-timestamp record2, try again
1002 self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
1003 self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
1005 timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
1006 self.assert_timestamps_equal(timestamp1, n_days_ago)
1008 # no change
1009 timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
1010 self.assert_timestamps_equal(timestamp2, n_days_ago)
1011 # also no change
1012 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1013 self.assert_timestamps_equal(timestamp1, timestamp2)
1015 # let's introduce another record
1016 txt3 = ['3']
1017 self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
1018 self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
1020 timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
1021 self.assert_soon_after(timestamp3, current_time)
1023 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1024 timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
1026 if n_days <= 7:
1027 self.assert_timestamps_equal(timestamp1, n_days_ago)
1028 else:
1029 self.assert_timestamps_equal(timestamp1, timestamp3)
1031 self.assert_timestamps_equal(timestamp2, timestamp3)
1033 self.ldap_delete_record(name, txt3)
1034 timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
1035 self.assert_soon_after(timestamp3, current_time)
1036 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1037 timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
1039 if n_days <= 7:
1040 self.assert_timestamps_equal(timestamp1, n_days_ago)
1041 else:
1042 self.assert_timestamps_equal(timestamp1, timestamp3)
1044 self.assert_timestamps_equal(timestamp2, timestamp3)
1046 # and here we'll make txt3 static
1047 txt4 = ['4']
1049 # and here we'll make txt1 static
1050 self.ldap_update_record(name, txt1, dwTimeStamp=0)
1051 self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
1052 self.ldap_update_record(name, txt3, dwTimeStamp=n_days_ago)
1053 timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
1054 timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
1055 timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
1056 timestamp4 = self.dns_update_record(name, txt4).dwTimeStamp
1058 self.assertEqual(timestamp1, 0)
1059 self.assert_timestamps_equal(timestamp2, n_days_ago)
1060 self.assert_timestamps_equal(timestamp3, n_days_ago)
1061 self.assert_soon_after(timestamp4, current_time)
1063 def test_update_aging_disabled_in_no_refresh_window(self):
1064 self._test_update_aging_disabled_n_days_ago(4)
1066 def test_update_aging_disabled_on_no_refresh_boundary(self):
1067 self._test_update_aging_disabled_n_days_ago(7)
1069 def test_update_aging_disabled_in_refresh_window(self):
1070 self._test_update_aging_disabled_n_days_ago(9)
1072 def test_update_aging_disabled_beyond_refresh_window(self):
1073 self._test_update_aging_disabled_n_days_ago(16)
1075 def test_update_aging_disabled_in_eighteenth_century(self):
1076 self._test_update_aging_disabled_n_days_ago(100000)
1078 def test_update_aging_disabled_static(self):
1079 name = 'test'
1080 txt1 = ['1']
1081 txt2 = ['2']
1083 self.set_aging(False)
1085 current_time = self.dns_update_record(name, txt1).dwTimeStamp
1086 self.ldap_update_record(name, txt1, dwTimeStamp=0)
1088 # no change when updating this record
1089 timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
1090 self.assertEqual(timestamp1, 0)
1092 # add another record, which should have the current timestamp
1093 timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
1094 self.assert_soon_after(timestamp2, current_time)
1096 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1097 self.assert_soon_after(timestamp1, current_time)
1099 # let's repeat that, this time with txt2 existing
1100 timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
1101 self.assert_soon_after(timestamp2, current_time)
1103 timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
1104 self.assert_soon_after(timestamp2, current_time)
1106 # delete record2, try again
1107 self.ldap_delete_record(name, txt2)
1108 self.ldap_update_record(name, txt1, dwTimeStamp=0)
1109 # no change when updating this record
1110 timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
1111 self.assertEqual(timestamp1, 0)
1113 timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
1114 self.assertEqual(timestamp2, 0)
1116 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1117 self.assertEqual(timestamp1, 0)
1118 # re-timestamp record2, try again
1119 self.ldap_update_record(name, txt2, dwTimeStamp=1)
1120 self.ldap_update_record(name, txt1, dwTimeStamp=0)
1121 # no change when updating this record
1122 timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
1123 self.assert_timestamps_equal(timestamp2, 1)
1125 def test_update_aging_disabled(self):
1126 # With aging disabled, Windows updates the timestamps of all
1127 # records when one is updated.
1128 name = 'test'
1129 txt1 = ['test txt']
1130 txt2 = ['test', 'txt2']
1131 txt3 = ['test', 'txt3']
1132 minus_6 = -6 * 24
1133 minus_8 = -8 * 24
1135 self.set_aging(False)
1137 current_time = self.dns_update_record(name, txt1).dwTimeStamp
1139 # rewind timestamp using ldap
1140 self.ldap_modify_timestamps(name, minus_6)
1141 after_mod = self.get_unique_txt_record(name, txt1)
1142 six_days_ago = after_mod.dwTimeStamp
1143 self.assert_timestamps_equal(six_days_ago, current_time + minus_6)
1145 # no change
1146 update_timestamp = self.dns_update_record(name, txt1).dwTimeStamp
1147 self.assert_timestamps_equal(update_timestamp, six_days_ago)
1149 self.check_query_txt(name, txt1, zone=self.zone)
1151 # another record
1152 timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
1153 self.assert_soon_after(timestamp2, current_time)
1155 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1156 # without aging, timestamp1 is changed!!
1157 self.assert_timestamps_equal(timestamp1, timestamp2)
1159 # Set both records back to 8 days ago.
1160 self.ldap_modify_timestamps(name, minus_8)
1162 eight_days_ago = self.get_unique_txt_record(name, txt1).dwTimeStamp
1163 self.assert_timestamps_equal(eight_days_ago, current_time + minus_8)
1165 update2 = self.dns_update_record(name, txt2)
1167 # Without aging on, an update should not change the timestamps.
1168 self.assert_timestamps_equal(update2.dwTimeStamp, eight_days_ago)
1169 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1170 self.assert_timestamps_equal(timestamp1, eight_days_ago)
1172 # Add another txt record. The new record should have the now
1173 # timestamp, and drag the others up with it.
1174 timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
1175 self.assert_soon_after(timestamp3, current_time)
1176 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1177 timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
1178 self.assert_timestamps_equal(timestamp1, timestamp3)
1179 self.assert_timestamps_equal(timestamp2, timestamp3)
1181 hundred_days_ago = current_time - 100 * 24
1182 thousand_days_ago = current_time - 1000 * 24
1183 record = self.ldap_update_record(name, txt1,
1184 dwTimeStamp=hundred_days_ago)
1185 self.assert_timestamps_equal(record.dwTimeStamp, hundred_days_ago)
1186 record = self.ldap_update_record(name, txt2,
1187 dwTimeStamp=thousand_days_ago)
1188 self.assert_timestamps_equal(record.dwTimeStamp, thousand_days_ago)
1190 # update 3, will others change (because beyond RefreshInterval)? yes.
1191 timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
1192 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1193 timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
1194 self.assert_soon_after(timestamp3, current_time)
1195 self.assert_timestamps_equal(timestamp1, hundred_days_ago)
1196 self.assert_timestamps_equal(timestamp2, thousand_days_ago)
1198 fifteen_days_ago = current_time - 15 * 24
1199 self.ldap_update_record(name, txt3, dwTimeStamp=fifteen_days_ago)
1201 timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
1202 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1203 timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
1204 # DNS update has no effect because all records are old
1205 self.assert_timestamps_equal(timestamp2, thousand_days_ago)
1206 self.assert_timestamps_equal(timestamp1, hundred_days_ago)
1207 self.assert_timestamps_equal(timestamp3, fifteen_days_ago)
1209 # Does update of old record affect timestamp of refreshable record? No.
1210 self.ldap_update_record(name, txt3, dwTimeStamp=eight_days_ago)
1211 timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
1212 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1213 timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
1214 # DNS update has no effect because all records are old
1215 self.assert_timestamps_equal(timestamp2, thousand_days_ago)
1216 self.assert_timestamps_equal(timestamp1, hundred_days_ago)
1217 self.assert_timestamps_equal(timestamp3, eight_days_ago)
1219 # RPC zeros timestamp, after which updates won't change it.
1220 # BUT it refreshes all others!
1221 self.rpc_update_record(name, txt2)
1223 timestamp2 = self.dns_update_record(name, txt3).dwTimeStamp
1224 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1225 timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
1226 self.assertEqual(timestamp2, 0)
1227 self.assert_soon_after(timestamp1, current_time)
1228 self.assert_timestamps_equal(timestamp3, eight_days_ago)
1230 def test_update_aging_enabled(self):
1231 name = 'test'
1232 txt1 = ['test txt']
1233 txt2 = ['test', 'txt2']
1234 txt3 = ['test', 'txt3']
1235 txt4 = ['4']
1237 self.set_aging(True)
1239 current_time = self.dns_update_record(name, txt2).dwTimeStamp
1241 six_days_ago = current_time - 6 * 24
1242 eight_days_ago = current_time - 8 * 24
1243 fifteen_days_ago = current_time - 15 * 24
1244 hundred_days_ago = current_time - 100 * 24
1246 self.ldap_update_record(name, txt1, dwTimeStamp=six_days_ago)
1248 # with or without aging, a delta of -6 days does not affect
1249 # timestamps, because dwNoRefreshInterval is 7 days.
1250 timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
1251 timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
1253 self.assert_timestamps_equal(timestamp1, six_days_ago)
1254 self.assert_soon_after(timestamp2, current_time)
1256 self.ldap_update_record(name, txt3, dwTimeStamp=eight_days_ago)
1257 timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
1258 self.assert_timestamps_equal(timestamp3, eight_days_ago)
1260 # update 1, what happens to 2 and 3? Nothing?
1261 timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
1262 timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
1263 timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
1264 self.assert_timestamps_equal(timestamp1, six_days_ago)
1265 self.assert_soon_after(timestamp2, current_time)
1266 self.assert_timestamps_equal(timestamp3, eight_days_ago)
1268 # now set 1 to 8 days, and we should see changes
1269 self.ldap_update_record(name, txt1, dwTimeStamp=eight_days_ago)
1271 # update 1, what happens to 2 and 3? Nothing?
1272 timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
1273 timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
1274 timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
1275 self.assert_soon_after(timestamp1, current_time)
1276 self.assert_soon_after(timestamp2, current_time)
1277 self.assert_timestamps_equal(timestamp3, eight_days_ago)
1279 # next few ones use these numbers
1280 self.ldap_update_record(name, txt1, dwTimeStamp=fifteen_days_ago)
1281 self.ldap_update_record(name, txt2, dwTimeStamp=six_days_ago)
1282 self.ldap_update_record(name, txt3, dwTimeStamp=eight_days_ago)
1284 # change even though 1 is outside the window
1285 timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
1286 timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
1287 timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
1288 self.assert_soon_after(timestamp1, current_time)
1289 self.assert_timestamps_equal(timestamp2, six_days_ago)
1290 self.assert_timestamps_equal(timestamp3, eight_days_ago)
1292 # reset 1
1293 self.ldap_update_record(name, txt1, dwTimeStamp=fifteen_days_ago)
1295 # no change, because 2 is outside the window
1296 timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
1297 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1298 timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
1299 self.assert_timestamps_equal(timestamp1, fifteen_days_ago)
1300 self.assert_timestamps_equal(timestamp2, six_days_ago)
1301 self.assert_timestamps_equal(timestamp3, eight_days_ago)
1303 # 3 changes, others do not
1304 timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
1305 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1306 timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
1307 self.assert_timestamps_equal(timestamp1, fifteen_days_ago)
1308 self.assert_timestamps_equal(timestamp2, six_days_ago)
1309 self.assert_soon_after(timestamp3, current_time)
1311 # reset 3 to 100 days
1312 self.ldap_update_record(name, txt3, dwTimeStamp=hundred_days_ago)
1314 # 3 changes, others do not
1315 timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
1316 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1317 timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
1318 self.assert_timestamps_equal(timestamp1, fifteen_days_ago)
1319 self.assert_timestamps_equal(timestamp2, six_days_ago)
1320 self.assert_soon_after(timestamp3, current_time)
1322 # reset 1 and 3 to 8 days. does update of 1 affect 3?
1323 self.ldap_update_record(name, txt1, dwTimeStamp=eight_days_ago)
1324 self.ldap_update_record(name, txt3, dwTimeStamp=eight_days_ago)
1326 # 1 changes, others do not
1327 timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
1328 timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
1329 timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
1330 self.assert_soon_after(timestamp1, current_time)
1331 self.assert_timestamps_equal(timestamp2, six_days_ago)
1332 self.assert_timestamps_equal(timestamp3, eight_days_ago)
1334 # Try an RPC update, zeroing 1 --> what happens to 3?
1335 timestamp1 = self.rpc_update_record(name, txt1).dwTimeStamp
1336 timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
1337 timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
1338 self.assertEqual(timestamp1, 0)
1339 self.assert_timestamps_equal(timestamp2, six_days_ago)
1340 self.assert_timestamps_equal(timestamp3, eight_days_ago)
1342 # with 2 and 3 at 8 days, does static record change things?
1343 self.ldap_update_record(name, txt2, dwTimeStamp=eight_days_ago)
1344 # 2 changes, but to zero!
1345 timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
1346 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1347 timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
1348 self.assert_timestamps_equal(timestamp1, 0)
1349 self.assert_timestamps_equal(timestamp2, 0)
1350 self.assert_timestamps_equal(timestamp3, eight_days_ago)
1352 self.ldap_update_record(name, txt2, dwTimeStamp=six_days_ago)
1353 self.ldap_update_record(name, txt1, dwTimeStamp=3000000)
1354 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1355 self.assert_timestamps_equal(timestamp1, 3000000)
1357 # dns update remembers that node is static, even with no
1358 # static records.
1359 timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
1360 self.assertEqual(timestamp1, 0)
1362 # Add another txt record. The new record should have the now
1363 # timestamp, and the others should remain unchanged.
1364 # BUT somehow record 1 is static!?
1365 timestamp4 = self.dns_update_record(name, txt4).dwTimeStamp
1366 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1367 timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
1368 timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
1369 self.assert_timestamps_equal(timestamp1, 0)
1370 self.assert_timestamps_equal(timestamp2, six_days_ago)
1371 self.assert_timestamps_equal(timestamp3, eight_days_ago)
1372 self.assert_timestamps_equal(timestamp4, 0)
1374 def _test_update_aging_enabled_n_days_ago(self, n_days):
1375 name = 'test'
1376 txt1 = ['1']
1377 txt2 = ['2']
1378 delta = n_days * -24
1380 self.set_aging(True)
1381 current_time = self.dns_update_record(name, txt1).dwTimeStamp
1383 # rewind timestamp using ldap
1384 self.ldap_modify_timestamps(name, delta)
1385 n_days_ago = self.get_unique_txt_record(name, txt1).dwTimeStamp
1386 self.assertGreater(current_time, n_days_ago)
1388 # update changes timestamp depending on time.
1389 timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
1390 if n_days <= 7:
1391 self.assert_timestamps_equal(timestamp1, n_days_ago)
1392 else:
1393 self.assert_soon_after(timestamp1, current_time)
1395 # add another record, which should have the current timestamp
1396 timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
1397 self.assert_soon_after(timestamp2, current_time)
1399 # first record should not have changed
1400 timestamp1_b = self.get_unique_txt_record(name, txt1).dwTimeStamp
1401 self.assert_timestamps_equal(timestamp1, timestamp1_b)
1403 # let's repeat that, this time with txt2 existing
1404 self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
1406 timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
1407 self.assert_timestamps_equal(timestamp1, timestamp1_b)
1409 # this update is not an add. record 2 is already up-to-date
1410 timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
1411 self.assert_soon_after(timestamp2, current_time)
1413 # now timestamp1 is not changed
1414 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1415 self.assert_timestamps_equal(timestamp1, timestamp1_b)
1417 # delete record2, try again
1418 self.ldap_delete_record(name, txt2)
1419 self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
1421 timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
1422 if n_days <= 7:
1423 self.assert_timestamps_equal(timestamp1, n_days_ago)
1424 else:
1425 self.assert_soon_after(timestamp1, current_time)
1427 # here we are re-adding the deleted record
1428 timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
1429 self.assert_soon_after(timestamp2, current_time)
1431 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1433 # It gets weird HERE.
1434 # note how the SIBLING of the deleted, re-added record differs
1435 # from the sibling of freshly added record, depending on the
1436 # time difference.
1437 if n_days <= 7:
1438 self.assert_timestamps_equal(timestamp1, n_days_ago)
1439 else:
1440 self.assert_timestamps_equal(timestamp1, timestamp2)
1442 # re-timestamp record2, try again
1443 self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
1444 self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
1446 # this should make no difference
1447 timestamp1_b = self.dns_update_record(name, txt1).dwTimeStamp
1448 self.assert_timestamps_equal(timestamp1, timestamp1_b)
1450 # no change
1451 timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
1452 self.assert_timestamps_equal(timestamp2, timestamp1)
1453 # also no change
1454 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1455 self.assert_timestamps_equal(timestamp1, timestamp2)
1457 # let's introduce another record
1458 txt3 = ['3']
1459 self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
1460 self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
1462 timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
1463 self.assert_soon_after(timestamp3, current_time)
1465 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1466 timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
1468 self.assert_timestamps_equal(timestamp1, n_days_ago)
1469 self.assert_timestamps_equal(timestamp2, n_days_ago)
1471 self.ldap_delete_record(name, txt3)
1472 timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
1473 self.assert_soon_after(timestamp3, current_time)
1474 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1475 timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
1477 self.assert_timestamps_equal(timestamp1, n_days_ago)
1478 self.assert_timestamps_equal(timestamp2, n_days_ago)
1480 txt4 = ['4']
1482 # Because txt1 is static, txt4 is static
1483 self.ldap_update_record(name, txt1, dwTimeStamp=0)
1484 self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
1485 self.ldap_update_record(name, txt3, dwTimeStamp=n_days_ago)
1486 timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
1487 timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
1488 timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
1489 timestamp4 = self.dns_update_record(name, txt4).dwTimeStamp
1491 self.assert_timestamps_equal(timestamp1, 0)
1492 self.assert_timestamps_equal(timestamp2, n_days_ago)
1493 self.assert_timestamps_equal(timestamp3, n_days_ago)
1494 self.assert_timestamps_equal(timestamp4, 0)
1496 longer_ago = n_days_ago // 2
1498 # remove all static records.
1499 self.ldap_delete_record(name, txt4)
1500 self.ldap_update_record(name, txt1, dwTimeStamp=longer_ago)
1501 self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
1502 self.ldap_update_record(name, txt3, dwTimeStamp=n_days_ago)
1503 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1504 self.assert_timestamps_equal(timestamp1, longer_ago)
1506 timestamp4 = self.dns_update_record(name, txt4).dwTimeStamp
1507 timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
1508 timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
1509 timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
1511 # Here, although there is no record from which to get the zero
1512 # timestamp, record 4 does it anyway.
1513 self.assert_timestamps_equal(timestamp1, longer_ago)
1514 self.assert_timestamps_equal(timestamp2, n_days_ago)
1515 self.assert_timestamps_equal(timestamp3, n_days_ago)
1516 self.assert_timestamps_equal(timestamp4, 0)
1518 # and now record 1 wants to be static.
1519 self.ldap_update_record(name, txt4, dwTimeStamp=longer_ago)
1520 timestamp4 = self.get_unique_txt_record(name, txt4).dwTimeStamp
1521 self.assert_timestamps_equal(timestamp4, longer_ago)
1522 timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
1523 timestamp4 = self.get_unique_txt_record(name, txt4).dwTimeStamp
1524 self.assert_timestamps_equal(timestamp1, 0)
1525 self.assert_timestamps_equal(timestamp4, longer_ago)
1527 def test_update_aging_enabled_in_no_refresh_window(self):
1528 self._test_update_aging_enabled_n_days_ago(4)
1530 def test_update_aging_enabled_on_no_refresh_boundary(self):
1531 self._test_update_aging_enabled_n_days_ago(7)
1533 def test_update_aging_enabled_in_refresh_window(self):
1534 self._test_update_aging_enabled_n_days_ago(9)
1536 def test_update_aging_enabled_beyond_refresh_window(self):
1537 self._test_update_aging_enabled_n_days_ago(16)
1539 def test_update_aging_enabled_in_eighteenth_century(self):
1540 self._test_update_aging_enabled_n_days_ago(100000)
1542 def test_update_static_stickiness(self):
1543 name = 'test'
1544 A = ['A']
1545 B = ['B']
1546 C = ['C']
1547 D = ['D']
1549 self.set_aging(False)
1550 self.dns_update_record(name, A).dwTimeStamp
1551 self.ldap_update_record(name, B, dwTimeStamp=0)
1552 self.dns_update_record(name, B)
1553 self.dns_update_record(name, C)
1554 ctime = self.get_unique_txt_record(name, C).dwTimeStamp
1555 self.assertEqual(ctime, 0)
1556 btime = self.get_unique_txt_record(name, B).dwTimeStamp
1557 self.assertEqual(btime, 0)
1559 self.ldap_replace_records(name, [])
1561 self.dns_update_record(name, D)
1562 dtime = self.get_unique_txt_record(name, D).dwTimeStamp
1563 self.assertEqual(dtime, 0)
1565 def _test_update_timestamp_weirdness(self, n_days, aging=True):
1566 name = 'test'
1567 A = ['A']
1568 B = ['B']
1570 self.set_aging(aging)
1572 current_time = self.dns_update_record(name, A).dwTimeStamp
1574 # rewind timestamp using ldap
1575 self.ldap_modify_timestamps(name, n_days * -24)
1576 n_days_ago = self.get_unique_txt_record(name, A).dwTimeStamp
1577 time_A = self.dns_update_record(name, A).dwTimeStamp
1578 # that dns_update should have reset the timestamp ONLY if
1579 # aging is on and the old timestamp is > noRefresh period (7
1580 # days)
1581 if n_days > 7 and aging:
1582 self.assert_soon_after(time_A, current_time)
1583 else:
1584 self.assert_timestamps_equal(time_A, n_days_ago)
1586 # add another record, which should have the current timestamp
1587 time_B = self.dns_update_record(name, B).dwTimeStamp
1588 self.assert_soon_after(time_B, current_time)
1590 time_A = self.get_unique_txt_record(name, A).dwTimeStamp
1591 if aging and n_days <= 7:
1592 self.assert_timestamps_equal(time_A, n_days_ago)
1593 else:
1594 self.assert_soon_after(time_A, current_time)
1596 # delete B, try again
1597 self.ldap_delete_record(name, B)
1598 self.ldap_update_record(name, A, dwTimeStamp=n_days_ago)
1600 time_A = self.dns_update_record(name, A).dwTimeStamp
1602 # here we are re-adding the deleted record
1603 time_B = self.dns_update_record(name, B).dwTimeStamp
1604 self.assert_soon_after(time_B, current_time)
1606 time_A = self.get_unique_txt_record(name, A).dwTimeStamp
1607 return n_days_ago, time_A, time_B
1609 def test_update_timestamp_weirdness_no_refresh_no_aging(self):
1610 n_days_ago, time_A, time_B = \
1611 self._test_update_timestamp_weirdness(5, False)
1612 # the timestamp of the SIBLING of the deleted, re-added record
1613 # differs from the sibling of freshly added record.
1614 self.assert_timestamps_equal(time_A, n_days_ago)
1616 def test_update_timestamp_weirdness_no_refresh_aging(self):
1617 n_days_ago, time_A, time_B = \
1618 self._test_update_timestamp_weirdness(5, True)
1619 # the timestamp of the SIBLING of the deleted, re-added record
1620 # differs from the sibling of freshly added record.
1621 self.assert_timestamps_equal(time_A, n_days_ago)
1623 def test_update_timestamp_weirdness_refresh_no_aging(self):
1624 n_days_ago, time_A, time_B = \
1625 self._test_update_timestamp_weirdness(9, False)
1626 self.assert_timestamps_equal(time_A, time_B)
1628 def test_update_timestamp_weirdness_refresh_aging(self):
1629 n_days_ago, time_A, time_B = \
1630 self._test_update_timestamp_weirdness(9, True)
1631 self.assert_timestamps_equal(time_A, time_B)
1633 def test_aging_refresh(self):
1634 name, txt = 'agingtest', ['test txt']
1635 no_refresh = 200
1636 refresh = 160
1637 self.set_zone_int_params(NoRefreshInterval=no_refresh,
1638 RefreshInterval=refresh,
1639 Aging=1)
1640 before_mod = self.dns_update_record(name, txt)
1641 start_time = before_mod.dwTimeStamp
1643 # go back 86 hours, which is in the no-refresh time (but
1644 # wouldn't be if we had stuck to the default of 168).
1645 self.ldap_modify_timestamps(name, -170)
1646 rec = self.dns_update_record(name, txt)
1647 self.assert_timestamps_equal(rec.dwTimeStamp,
1648 start_time - 170)
1650 # back to -202 hours, into the refresh zone
1651 # the update should reset the timestamp to now.
1652 self.ldap_modify_timestamps(name, -32)
1653 rec = self.dns_update_record(name, txt)
1654 self.assert_soon_after(rec.dwTimeStamp, start_time)
1656 # back to -362 hours, beyond the end of the refresh period.
1657 # Actually nothing changes at this time -- we can still
1658 # refresh, but the record is liable for scavenging.
1659 self.ldap_modify_timestamps(name, -160)
1660 rec = self.dns_update_record(name, txt)
1661 self.assert_soon_after(rec.dwTimeStamp, start_time)
1663 def test_add_no_timestamp(self):
1664 # check zero timestamp is implicit
1665 self.set_aging(True)
1666 rec = self.ldap_update_record('ldap', 'test')
1667 self.assertEqual(rec.dwTimeStamp, 0)
1668 rec = self.rpc_update_record('rpc', 'test')
1669 self.assertEqual(rec.dwTimeStamp, 0)
1671 def test_add_zero_timestamp(self):
1672 rec = self.ldap_update_record('ldap', 'test', dwTimeStamp=0)
1673 self.assertEqual(rec.dwTimeStamp, 0)
1674 rec = self.rpc_update_record('rpc', 'test', dwTimeStamp=0)
1675 self.assertEqual(rec.dwTimeStamp, 0)
1677 def test_add_update_timestamp(self):
1678 # LDAP can change timestamp, RPC can't
1679 rec = self.ldap_update_record('ldap', 'test', dwTimeStamp=123456)
1680 self.assertEqual(rec.dwTimeStamp, 123456)
1681 rec = self.rpc_update_record('rpc', 'test', dwTimeStamp=123456)
1682 self.assertEqual(rec.dwTimeStamp, 0)
1683 # second time is a different code path (add vs update)
1684 rec = self.rpc_update_record('rpc', 'test', dwTimeStamp=123456)
1685 self.assertEqual(rec.dwTimeStamp, 0)
1686 # RPC update the one with timestamp, zeroing it.
1687 rec = self.rpc_update_record('ldap', 'test', dwTimeStamp=123456)
1688 self.assertEqual(rec.dwTimeStamp, 0)
1690 def test_add_update_ttl(self):
1691 # RPC *can* set dwTtlSeconds.
1692 rec = self.ldap_update_record('ldap', 'test',
1693 dwTtlSeconds=1234)
1694 self.assertEqual(rec.dwTtlSeconds, 1234)
1695 rec = self.rpc_update_record('rpc', 'test', dwTtlSeconds=1234)
1696 self.assertEqual(rec.dwTtlSeconds, 1234)
1697 # does update work like add?
1698 rec = self.rpc_update_record('rpc', 'test', dwTtlSeconds=4321)
1699 self.assertEqual(rec.dwTtlSeconds, 4321)
1700 rec = self.rpc_update_record('ldap', 'test', dwTtlSeconds=5678)
1701 self.assertEqual(rec.dwTtlSeconds, 5678)
1703 def test_add_update_ttl_serial(self):
1704 # when setting dwTtlSeconds, what happens to serial number?
1705 rec = self.ldap_update_record('ldap', 'test',
1706 dwTtlSeconds=1234,
1707 dwSerial=123)
1708 self.assertEqual(rec.dwTtlSeconds, 1234)
1709 self.assertEqual(rec.dwSerial, 123)
1710 rec = self.rpc_update_record('rpc', 'test', dwTtlSeconds=1234)
1711 self.assertEqual(rec.dwTtlSeconds, 1234)
1712 serial = rec.dwSerial
1713 self.assertLess(serial, 4)
1714 rec = self.rpc_update_record('rpc', 'test', dwTtlSeconds=4321)
1715 self.assertEqual(rec.dwTtlSeconds, 4321)
1716 self.assertEqual(rec.dwSerial, serial + 1)
1717 rec = self.rpc_update_record('ldap', 'test', dwTtlSeconds=5678)
1718 self.assertEqual(rec.dwTtlSeconds, 5678)
1719 self.assertEqual(rec.dwSerial, 124)
1721 def test_add_update_dwFlags(self):
1722 # dwFlags splits into rank and flags.
1723 # according to [MS-DNSP] 2.3.2.2, flags MUST be zero
1724 rec = self.ldap_update_record('ldap', 'test', flags=22222, rank=222)
1725 self.assertEqual(rec.flags, 22222)
1726 self.assertEqual(rec.rank, 222)
1728 rec = self.rpc_update_record('ldap', 'test', dwFlags=3333333)
1729 # rank != 3333333 & 0xff == 213
1730 self.assertEqual(rec.rank, 240) # RPC fixes rank
1731 self.assertEqual(rec.flags, 0)
1733 self.assertRaises(OverflowError,
1734 self.ldap_update_record,
1735 'ldap', 'test', flags=777777777, rank=777)
1737 # reset to no default (rank overflows)
1738 rec = self.ldap_update_record('ldap', 'test', flags=7777, rank=777)
1739 self.assertEqual(rec.flags, 7777)
1740 self.assertEqual(rec.rank, 9)
1742 # DNS update zeros flags, sets rank to 240 (RANK_ZONE)
1743 rec = self.dns_update_record('ldap', 'test', ttl=999)
1744 self.assertEqual(rec.flags, 0)
1745 self.assertEqual(rec.rank, 240)
1747 rec = self.rpc_update_record('ldap', 'test', dwFlags=321)
1748 self.assertEqual(rec.flags, 0)
1749 self.assertEqual(rec.rank, 240)
1751 # RPC adding a new record: fixed rank, zero flags
1752 rec = self.rpc_update_record('ldap', 'test 2', dwFlags=12345)
1753 self.assertEqual(rec.rank, 240)
1754 self.assertEqual(rec.flags, 0)
1756 def test_add_update_dwReserved(self):
1757 # RPC does not change dwReserved.
1758 rec = self.ldap_update_record('ldap', 'test', dwReserved=54321)
1759 self.assertEqual(rec.dwReserved, 54321)
1760 rec = self.rpc_update_record('rpc', 'test', dwReserved=54321)
1761 self.assertEqual(rec.dwReserved, 0)
1762 rec = self.rpc_update_record('rpc', 'test', dwReserved=54321)
1763 self.assertEqual(rec.dwReserved, 0)
1764 rec = self.rpc_update_record('ldap', 'test', dwReserved=12345)
1765 self.assertEqual(rec.dwReserved, 54321)
1767 def test_add_update_dwSerial(self):
1768 # On Windows the RPC record ends up with serial 2, on Samba
1769 # serial 3. Rather than knownfail this, we accept anything
1770 # below 4 (for now).
1771 rec = self.ldap_update_record('ldap', 'test', dwSerial=123)
1772 self.assertEqual(rec.dwSerial, 123)
1773 rec = self.rpc_update_record('rpc', 'test', dwSerial=123)
1774 self.assertLess(rec.dwSerial, 4)
1775 rec = self.rpc_update_record('rpc', 'test', dwSerial=123)
1776 self.assertLess(rec.dwSerial, 4)
1777 rec = self.dns_update_record('rpc', 'test')
1778 self.assertLess(rec.dwSerial, 4)
1779 rec = self.dns_update_record('dns-0', 'test')
1780 self.assertLess(rec.dwSerial, 5)
1782 rec = self.dns_update_record('ldap', 'test')
1783 self.assertEqual(rec.dwSerial, 123)
1784 rec = self.rpc_update_record('ldap', 'test', dwSerial=123)
1785 self.assertEqual(rec.dwSerial, 123)
1786 rec = self.ldap_update_record('ldap', 'test', dwSerial=12)
1787 self.assertEqual(rec.dwSerial, 12)
1788 # when we dns-updated ldap/test, we alerted Windows to 123 as
1789 # a high water mark for the zone. (even though we have since
1790 # dropped the serial to 12, 123 is the base serial for new
1791 # records).
1792 rec = self.dns_update_record('dns', 'test')
1793 self.assertEqual(rec.dwSerial, 124)
1794 rec = self.dns_update_record('dns2', 'test')
1795 self.assertEqual(rec.dwSerial, 125)
1796 rec = self.rpc_update_record('rpc2', 'test')
1797 self.assertEqual(rec.dwSerial, 126)
1798 rec = self.dns_update_record('dns', 'test 2')
1799 self.assertEqual(rec.dwSerial, 127)
1801 def test_add_update_dwSerial_2(self):
1802 # On Samba the RPC update resets the serial to a low number,
1803 # while Windows leaves it high.
1804 rec = self.ldap_update_record('ldap', 'test', dwSerial=123)
1805 self.assertEqual(rec.dwSerial, 123)
1806 rec = self.rpc_update_record('ldap', 'test', dwSerial=321)
1807 self.assertEqual(rec.dwSerial, 123)
1808 rec = self.dns_update_record('ldap', 'test')
1809 self.assertEqual(rec.dwSerial, 123)
1811 def test_rpc_update_disparate_types(self):
1812 """Can we use update to replace a TXT with an AAAA?"""
1813 name = 'x'
1814 old = TXTRecord("x")
1815 new = ARecord("127.0.0.111")
1816 self.rpc_replace(name, None, old)
1817 recs = self.ldap_get_records(name)
1818 self.assertEqual(len(recs), 1)
1819 self.assertEqual(recs[0].wType, old.wType)
1821 self.rpc_replace(name, old, new)
1822 recs = self.ldap_get_records(name)
1823 self.assertEqual(len(recs), 1)
1824 self.assertEqual(recs[0].wType, new.wType)
1826 def test_add_update_many(self):
1827 # Samba fails often in this set, but we want to see how it
1828 # goes further down, so we print the problems and defer the
1829 # failure.
1830 failures = 0
1831 total = 0
1833 def _defer_wrap(f):
1834 def _defer(*args):
1835 nonlocal failures, total
1836 total += 1
1837 try:
1838 f(*args)
1839 except self.failureException as e:
1840 from traceback import format_stack
1841 print(f"{format_stack()[-2]} {e}\n")
1842 failures += 1
1843 return _defer
1845 defer_assertEqual = _defer_wrap(self.assertEqual)
1846 defer_assert_timestamp_in_ballpark = \
1847 _defer_wrap(self.assert_timestamp_in_ballpark)
1849 self.set_aging(False)
1850 rec = self.ldap_update_record('ldap', 'test',
1851 version=11,
1852 rank=22,
1853 flags=33,
1854 dwSerial=44,
1855 dwTtlSeconds=55,
1856 dwReserved=66,
1857 dwTimeStamp=77)
1859 self.assertEqual(rec.version, 5) # disobeys request
1860 self.assertEqual(rec.rank, 22)
1861 self.assertEqual(rec.flags, 33)
1862 self.assertEqual(rec.dwSerial, 44)
1863 self.assertEqual(rec.dwTtlSeconds, 55)
1864 self.assertEqual(rec.dwReserved, 66)
1865 self.assertEqual(rec.dwTimeStamp, 77)
1866 # DNS updates first
1867 rec = self.dns_update_record('ldap', 'test', ttl=999)
1868 self.assertEqual(rec.version, 5)
1869 self.assertEqual(rec.rank, 240) # rank gets fixed by DNS update
1870 defer_assertEqual(rec.flags, 0) # flags gets fixed
1871 defer_assertEqual(rec.dwSerial, 45) # serial increments
1872 self.assertEqual(rec.dwTtlSeconds, 999) # TTL set
1873 defer_assertEqual(rec.dwReserved, 0) # reserved fixed
1874 defer_assert_timestamp_in_ballpark(rec) # changed on Windows ?!
1876 self.set_aging(True)
1877 rec = self.dns_update_record('ldap', 'test', ttl=1111)
1878 self.assertEqual(rec.version, 5)
1879 self.assertEqual(rec.rank, 240)
1880 defer_assertEqual(rec.flags, 0)
1881 defer_assertEqual(rec.dwSerial, 46)
1882 self.assertEqual(rec.dwTtlSeconds, 1111) # TTL set
1883 defer_assertEqual(rec.dwReserved, 0)
1884 self.assert_timestamp_in_ballpark(rec)
1886 # RPC update
1887 rec = self.rpc_update_record('ldap', 'test',
1888 version=111,
1889 dwFlags=333,
1890 dwSerial=444,
1891 dwTtlSeconds=555,
1892 dwReserved=666,
1893 dwTimeStamp=777)
1895 self.assertEqual(rec.version, 5) # no change
1896 self.assertEqual(rec.rank, 240) # no change
1897 defer_assertEqual(rec.flags, 0) # no change
1898 defer_assertEqual(rec.dwSerial, 47) # Serial increments
1899 self.assertEqual(rec.dwTtlSeconds, 555) # TTL set
1900 defer_assertEqual(rec.dwReserved, 0) # no change
1901 self.assertEqual(rec.dwTimeStamp, 0) # timestamp zeroed
1903 # RPC update, using default values
1904 rec = self.rpc_update_record('ldap', 'test')
1905 self.assertEqual(rec.version, 5)
1906 self.assertEqual(rec.rank, 240)
1907 defer_assertEqual(rec.flags, 0)
1908 defer_assertEqual(rec.dwSerial, 48) # serial increments
1909 self.assertEqual(rec.dwTtlSeconds, 900) # TTL changed
1910 defer_assertEqual(rec.dwReserved, 0)
1911 self.assertEqual(rec.dwTimeStamp, 0)
1913 self.set_aging(False)
1914 rec = self.dns_update_record('ldap', 'test', ttl=888)
1915 self.assertEqual(rec.version, 5)
1916 self.assertEqual(rec.rank, 240)
1917 defer_assertEqual(rec.flags, 0)
1918 defer_assertEqual(rec.dwSerial, 49) # serial increments
1919 self.assertEqual(rec.dwTtlSeconds, 888) # TTL set
1920 defer_assertEqual(rec.dwReserved, 0)
1921 self.assertEqual(rec.dwTimeStamp, 0) # timestamp stays zero
1923 if failures:
1924 self.fail(f"failed {failures}/{total} deferred assertions")
1926 def test_static_record_dynamic_update(self):
1927 """Add a static record, then a dynamic record.
1928 The dynamic record should have a timestamp set."""
1929 name = 'test'
1930 txt = ['static txt']
1931 txt2 = ['dynamic txt']
1932 self.set_aging(True)
1933 rec = self.ldap_update_record(name, txt, dwTimeStamp=0)
1934 rec2 = self.dns_update_record(name, txt2)
1935 self.assert_timestamp_in_ballpark(rec2)
1936 ts2 = rec2.dwTimeStamp
1937 # update the first record. It should stay static (timestamp 0)
1938 rec = self.dns_update_record(name, txt)
1939 self.assertEqual(rec.dwTimeStamp, 0)
1940 # and rec2 should be unchanged.
1941 self.assertEqual(rec2.dwTimeStamp, ts2)
1943 def test_dynamic_record_static_update(self):
1944 name = 'agingtest'
1945 txt1 = ['dns update before']
1946 txt2 = ['ldap update']
1947 txt3 = ['dns update after']
1948 self.set_aging(True)
1950 self.dns_update_record(name, txt1)
1951 self.ldap_update_record(name, txt2)
1952 self.dns_update_record(name, txt3)
1954 recs = self.get_rpc_records(name)
1955 for r in recs:
1956 d = [x.str for x in r.data.str]
1957 if d == txt1:
1958 self.assertNotEqual(r.dwTimeStamp, 0)
1959 elif d == txt2:
1960 self.assertEqual(r.dwTimeStamp, 0)
1961 elif d == txt3:
1962 self.assertNotEqual(r.dwTimeStamp, 0)
1964 def test_tombstone_in_hours_and_nttime(self):
1965 # Until now Samba has measured tombstone timestamps in hours,
1966 # not ten-millionths of a second. After now, we want Samba to
1967 # handle both.
1969 nh, oh, nn, on, on0, onf, nn0, nnf, _1601 = 'abcdefgij'
1970 now_hours = dsdb_dns.unix_to_dns_timestamp(int(time.time()))
1971 old_hours = now_hours - 24 * 90
1972 now_nttime = dsdb_dns.dns_timestamp_to_nt_time(now_hours)
1973 old_nttime = dsdb_dns.dns_timestamp_to_nt_time(old_hours)
1974 # calculations on hours might be based on the lower 32 bits,
1975 # so we test with these forced to extremes (the maximum change
1976 # is 429 seconds in NTTIME).
1977 old_nttime0 = old_nttime & 0xffffffff00000000
1978 old_nttimef = old_nttime | 0xffffffff
1979 now_nttime0 = now_nttime & 0xffffffff00000000
1980 now_nttimef = now_nttime | 0xffffffff
1981 self.dns_tombstone(nh, epoch_nttime=now_hours)
1982 self.dns_tombstone(oh, epoch_nttime=old_hours)
1983 self.dns_tombstone(nn, epoch_nttime=now_nttime)
1984 self.dns_tombstone(on, epoch_nttime=old_nttime)
1985 self.dns_tombstone(nn0, epoch_nttime=now_nttime0)
1986 self.dns_tombstone(nnf, epoch_nttime=now_nttimef)
1987 self.dns_tombstone(on0, epoch_nttime=old_nttime0)
1988 self.dns_tombstone(onf, epoch_nttime=old_nttimef)
1989 # this is our (arbitrary) threshold that will make us think in
1990 # NTTIME, not hours.
1991 self.dns_tombstone(_1601, epoch_nttime=(10 * 1000 * 1000 + 1))
1993 try:
1994 file_samdb = get_file_samdb()
1995 except ldb.LdbError as e:
1996 raise AssertionError(
1997 f"failing because '{e}': this is Windows?") from None
1998 dsdb._dns_delete_tombstones(file_samdb)
2000 # nh and nn should not be deleted
2001 for name in nh, nn, nn0, nnf:
2002 recs = self.ldap_get_records(name)
2003 self.assertEqual(len(recs), 1)
2004 self.assert_tombstoned(name, timestamp=False)
2006 # oh and on should be GONE
2007 for name in oh, on, on0, onf, _1601:
2008 recs = self.ldap_get_records(name)
2009 self.assertEqual(len(recs), 0)
2011 def test_dns_query_for_tombstoned_results(self):
2012 # This one fails on Windows, because the dns cache holds B
2013 # after it has been tombstoned behind its back.
2014 A = 'a'
2015 B = 'b'
2016 self.dns_tombstone(A)
2017 self.assert_tombstoned(A)
2018 r = self.dns_query(A, qtype=dns.DNS_QTYPE_TXT)
2019 self.assertEqual(r.ancount, 0)
2021 self.dns_update_record(B, B)
2022 self.dns_tombstone(B)
2023 self.assert_tombstoned(B)
2024 r = self.dns_query(B, qtype=dns.DNS_QTYPE_TXT)
2025 self.assertEqual(r.ancount, 0)
2027 def test_basic_scavenging(self):
2028 # NOTE: This one fails on Windows, because the RPC call to
2029 # prompt scavenging is not immediate. On Samba, in the
2030 # testenv, we don't have the RPC call but we can connect to
2031 # the database directly.
2033 # just to be sure we have the right limits.
2034 self.set_zone_int_params(NoRefreshInterval=168,
2035 RefreshInterval=168,
2036 Aging=1)
2038 ts1, ts2, ts3, ts4, ts5, ts6 = ('1', '2', '3', '4', '5', '6')
2039 self.dns_update_record(ts1, ts1)
2040 self.dns_update_record(ts2, ts2)
2041 # ts2 is tombstoned and timestamped in 1981
2042 self.dns_tombstone(ts2)
2043 # ts3 is tombstoned and timestamped in the future
2044 self.dns_tombstone(ts3, epoch_hours=(DNS_TIMESTAMP_2101 - 1))
2045 # ts4 is tombstoned and timestamped in the past
2046 self.dns_tombstone(ts4, epoch_hours=1111111)
2047 # ts5 is tombstoned in the past and timestamped in the future
2048 self.dns_tombstone(ts5, epoch_hours=5555555, epoch_nttime=int(1e10))
2050 # ts2 and ts3 should now be tombstoned.
2051 self.assert_tombstoned(ts2)
2052 self.assert_tombstoned(ts3)
2054 # let's un-tombstone ts2
2055 # ending up with dnsTombstoned: FALSE in Samba
2056 # and no dNSTombstoned in Windows.
2057 self.dns_update_record(ts2, "ts2 untombstoned")
2058 ts2_node = self.get_one_node(ts2)
2059 ts2_tombstone = ts2_node.get("dNSTombstoned")
2060 if ts2_tombstone is not None:
2061 self.assertEqual(ts2_tombstone[0], b"FALSE")
2063 self.assert_tombstoned(ts2, tombstoned=False)
2065 r = self.dns_update_record(ts6, ts6)
2067 # put some records into the death zone.
2068 self.ldap_modify_timestamps(ts1, -15 * 24)
2069 self.ldap_modify_timestamps(ts2, -14 * 24 - 2)
2070 self.ldap_modify_timestamps(ts6, -14 * 24 + 2)
2072 # ts1 will be saved by this record
2073 self.dns_update_record(ts1, "another record")
2075 try:
2076 # Tell the server to clean-up records.
2077 # This is how it *should* work on Windows:
2078 self.rpc_conn.DnssrvOperation2(
2079 dnsserver.DNS_CLIENT_VERSION_LONGHORN,
2081 SERVER_IP,
2082 None,
2084 "StartScavenging",
2085 dnsserver.DNSSRV_TYPEID_NULL,
2086 None)
2087 # Samba won't get here (NOT_IMPLEMENTED error)
2088 # wait for Windows to do its cleanup.
2089 time.sleep(2)
2090 except WERRORError as e:
2091 if e.args[0] == werror.WERR_CALL_NOT_IMPLEMENTED:
2092 # This is the Samba way, talking to the file directly,
2093 # as if we were the server process. The direct
2094 # connection is needed because the tombstoning search
2095 # involves a magic system only filter.
2096 file_samdb = get_file_samdb()
2097 dsdb._scavenge_dns_records(file_samdb)
2098 dsdb._dns_delete_tombstones(file_samdb)
2099 else:
2100 raise
2102 # Now what we should have:
2103 # ts1: alive: the old record is deleted, the new one not.
2104 # ts2: tombstoned
2105 # ts3: tombstoned
2106 # ts4: deleted. gone.
2107 # ts5: deleted. timestamp affects tombstoning, but not deletion.
2108 # ts6: alive
2110 # We order our assertions to make the windows test
2111 # fail as late as possible (on ts4, ts5, ts2).
2112 r = self.get_unique_txt_record(ts1, ["another record"])
2113 self.assertIsNotNone(r)
2114 r = self.get_unique_txt_record(ts6, [ts6])
2115 self.assertIsNotNone(r)
2117 self.assert_tombstoned(ts3)
2119 n = self.get_one_node(ts4)
2120 self.assertIsNone(n)
2121 n = self.get_one_node(ts5)
2122 self.assertIsNone(n)
2124 self.assert_tombstoned(ts2)
2126 def test_samba_scavenging(self):
2127 # We expect this one to fail on Windows, because scavenging
2128 # and tombstoning cannot be performed on demand.
2130 try:
2131 file_samdb = get_file_samdb()
2132 except ldb.LdbError as e:
2133 raise AssertionError(
2134 f"failing because '{e}': this is Windows?") from None
2136 # let's try different limits.
2137 self.set_zone_int_params(NoRefreshInterval=30,
2138 RefreshInterval=20,
2139 Aging=1)
2141 now = dsdb_dns.unix_to_dns_timestamp(int(time.time()))
2143 A, B, C, D = 'ABCD'
2144 # A has current time
2145 # B has safe, non-updateable time
2146 # C has safe time
2147 # D is scavengeable
2148 atime = self.dns_update_record(A, A).dwTimeStamp
2149 btime = self.ldap_update_record(B, B, dwTimeStamp=now-20).dwTimeStamp
2150 ctime = self.ldap_update_record(C, C, dwTimeStamp=now-40).dwTimeStamp
2151 dtime = self.ldap_update_record(D, D, dwTimeStamp=now-60).dwTimeStamp
2152 self.assert_soon_after(atime, now)
2153 self.assert_timestamps_equal(btime, now-20)
2154 self.assert_timestamps_equal(ctime, now-40)
2155 self.assert_timestamps_equal(dtime, now-60)
2157 dsdb._scavenge_dns_records(file_samdb)
2159 # D should be gone (tombstoned)
2160 r = self.get_unique_txt_record(D, D)
2161 self.assertIsNone(r)
2162 r = self.dns_query(D, qtype=dns.DNS_QTYPE_TXT)
2163 self.assertEqual(r.ancount, 0)
2164 recs = self.ldap_get_records(D)
2165 self.assertEqual(len(recs), 1)
2166 self.assert_tombstoned(D)
2168 # others unchanged.
2169 atime = self.get_unique_txt_record(A, A).dwTimeStamp
2170 btime = self.get_unique_txt_record(B, B).dwTimeStamp
2171 ctime = self.get_unique_txt_record(C, C).dwTimeStamp
2172 self.assert_soon_after(atime, now)
2173 self.assert_timestamps_equal(btime, now-20)
2174 self.assert_timestamps_equal(ctime, now-40)
2176 btime = self.dns_update_record(B, B).dwTimeStamp
2177 ctime = self.dns_update_record(C, C).dwTimeStamp
2178 self.assert_timestamps_equal(btime, now-40)
2179 self.assert_soon_after(ctime, now)
2181 # after this, D *should* still be a tombstone, because its
2182 # tombstone timestamp is not very old.
2183 dsdb._dns_delete_tombstones(file_samdb)
2184 recs = self.ldap_get_records(D)
2185 self.assertEqual(len(recs), 1)
2186 self.assert_tombstoned(D)
2188 # Let's delete C using rpc, and ensure it survives dns_delete_tombstones
2189 self.rpc_delete_txt(C, C)
2190 recs = self.ldap_get_records(C)
2191 self.assertEqual(len(recs), 1)
2192 self.assert_tombstoned(C)
2193 dsdb._dns_delete_tombstones(file_samdb)
2194 recs = self.ldap_get_records(C)
2195 self.assertEqual(len(recs), 1)
2196 self.assert_tombstoned(C)
2198 # now let's wind A and B back to either side of the two week
2199 # threshold. A should survive, B should not.
2200 self.dns_tombstone(A, (now - 166))
2201 self.dns_tombstone(B, (now - 170))
2202 dsdb._dns_delete_tombstones(file_samdb)
2204 recs = self.ldap_get_records(A)
2205 self.assertEqual(len(recs), 1)
2206 self.assert_tombstoned(A)
2208 recs = self.ldap_get_records(B)
2209 self.assertEqual(len(recs), 0)
2211 def _test_A_and_AAAA_records(self, A, B, a_days, b_days, aging):
2212 self.set_aging(aging)
2214 name = 'aargh'
2215 now = dsdb_dns.unix_to_dns_timestamp(int(time.time()))
2216 a_initial = now - 24 * a_days
2217 b_initial = now - 24 * b_days
2219 self.dns_update_non_text(name, A)
2220 self.ldap_modify_timestamps(name, a_days * -24)
2222 rec_a = self.get_unique_ip_record(name, A)
2223 rec_b = self.add_ip_record(name, B, dwTimeStamp=b_initial)
2225 self.assert_timestamps_equal(rec_a, a_initial)
2226 self.assert_timestamps_equal(rec_b, b_initial)
2228 # touch the A record.
2229 self.dns_update_non_text(name, A)
2231 # check the A timestamp, depending on norefresh
2232 rec_a = self.get_unique_ip_record(name, A)
2233 if aging and a_days > 7:
2234 time_a = now
2235 self.assert_soon_after(rec_a, now)
2236 elif a_days > 7:
2237 # when we have NO aging and are in the refresh window, the
2238 # timestamp now reads as a_initial, but will become now
2239 # after we manipulate B for a bit.
2240 time_a = now
2241 self.assert_timestamps_equal(rec_a, a_initial)
2242 else:
2243 time_a = a_initial
2244 self.assert_timestamps_equal(rec_a, a_initial)
2246 # B timestamp should be unchanged?
2247 rec_b = self.get_unique_ip_record(name, B)
2248 self.assert_timestamps_equal(rec_b, b_initial)
2250 # touch the B record.
2251 self.dns_update_non_text(name, B)
2253 # check the B timestamp
2254 rec_b = self.get_unique_ip_record(name, B)
2255 if not aging:
2256 self.windows_variation(
2257 self.assert_soon_after, rec_b, now,
2258 msg="windows updates non-aging, samba does not")
2259 else:
2260 self.assert_soon_after(rec_b, now)
2262 # rewind B
2263 rec_b = self.add_ip_record(name, B, dwTimeStamp=b_initial)
2265 # NOW rec A might have changed! with no aging, and out of refresh.
2266 rec_a = self.get_unique_ip_record(name, A)
2267 self.assert_timestamps_equal(rec_a, time_a)
2269 self.dns_update_non_text(name, A)
2271 rec_a = self.get_unique_ip_record(name, B)
2272 self.assert_timestamps_equal(rec_b, b_initial)
2274 # now delete A
2275 _, wtype = guess_wtype(A)
2276 self.ldap_delete_record(name, A, wtype=wtype)
2278 # re-add it
2279 self.dns_update_non_text(name, A)
2281 rec_a = self.get_unique_ip_record(name, A)
2282 self.assert_soon_after(rec_a, now)
2284 rec_b = self.get_unique_ip_record(name, B)
2285 self.assert_timestamps_equal(rec_b, b_initial)
2287 def test_A_5_days_AAAA_5_days_aging(self):
2288 self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 5, 5, aging=True)
2290 def test_A_5_days_AAAA_5_days_no_aging(self):
2291 self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 5, 5, aging=False)
2293 def test_A_5_days_AAAA_10_days_aging(self):
2294 self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 5, 10, aging=True)
2296 def test_A_5_days_AAAA_10_days_no_aging(self):
2297 self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 5, 10, aging=False)
2299 def test_A_10_days_AAAA_5_days_aging(self):
2300 self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 10, 5, aging=True)
2302 def test_A_10_days_AAAA_5_days_no_aging(self):
2303 self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 10, 5, aging=False)
2305 def test_A_10_days_AAAA_9_days_aging(self):
2306 self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 10, 9, aging=True)
2308 def test_A_9_days_AAAA_10_days_no_aging(self):
2309 self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 9, 10, aging=False)
2311 def test_A_20_days_AAAA_2_days_aging(self):
2312 self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 20, 2, aging=True)
2314 def test_A_6_days_AAAA_40_days_no_aging(self):
2315 self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 6, 40, aging=False)
2317 def test_A_5_days_A_5_days_aging(self):
2318 self._test_A_and_AAAA_records(IPv4_ADDR, IPv4_ADDR_2, 5, 5, aging=True)
2320 def test_A_5_days_A_10_days_no_aging(self):
2321 self._test_A_and_AAAA_records(IPv4_ADDR, IPv4_ADDR_2, 5, 10, aging=False)
2323 def test_AAAA_5_days_AAAA_6_days_aging(self):
2324 self._test_A_and_AAAA_records(IPv6_ADDR, IPv6_ADDR_2, 5, 6, aging=True)
2326 def test_AAAA_5_days_AAAA_6_days_no_aging(self):
2327 self._test_A_and_AAAA_records(IPv6_ADDR, IPv6_ADDR_2, 5, 6, aging=False)
2329 def _test_multi_records_delete(self, aging):
2330 # Batch deleting a type doesn't update other types timestamps.
2331 self.set_aging(aging)
2333 name = 'aargh'
2334 now = dsdb_dns.unix_to_dns_timestamp(int(time.time()))
2336 back_5_days = now - 5 * 24
2337 back_10_days = now - 10 * 24
2338 back_25_days = now - 25 * 24
2340 ip4s = {
2341 '1.1.1.1': now,
2342 '2.2.2.2': back_5_days,
2343 '3.3.3.3': back_10_days,
2345 ip6s = {
2346 '::1': now,
2347 '::2': back_5_days,
2348 '::3': back_25_days,
2351 txts = {
2352 '1': now,
2353 '2': back_5_days,
2354 '3': back_25_days,
2357 # For windows, if we don't DNS update something, it won't know
2358 # there's anything.
2359 self.dns_update_record(name, '3')
2361 for k, v in ip4s.items():
2362 r = self.add_ip_record(name, k, wtype=dns.DNS_QTYPE_A, dwTimeStamp=v)
2364 for k, v in ip6s.items():
2365 r = self.add_ip_record(name, k, wtype=dns.DNS_QTYPE_AAAA, dwTimeStamp=v)
2367 for k, v in txts.items():
2368 r = self.ldap_update_record(name, k, dwTimeStamp=v)
2370 self.dns_delete_type(name, dnsp.DNS_TYPE_A)
2372 r = self.dns_query(name, dns.DNS_QTYPE_A)
2373 self.assertEqual(r.ancount, 0)
2375 r = self.dns_query(name, dns.DNS_QTYPE_TXT)
2376 self.assertEqual(r.ancount, 3)
2377 rset = set(x.rdata.txt.str[0] for x in r.answers)
2378 self.assertEqual(rset, set(txts))
2380 r = self.dns_query(name, dns.DNS_QTYPE_AAAA)
2381 self.assertEqual(r.ancount, 3)
2382 rset = set(ipv6_normalise(x.rdata) for x in r.answers)
2383 self.assertEqual(rset, set(ip6s))
2385 recs = self.ldap_get_records(name)
2386 self.assertEqual(len(recs), 6)
2387 for r in recs:
2388 if r.wType == dns.DNS_QTYPE_AAAA:
2389 k = ipv6_normalise(r.data)
2390 expected = ip6s[k]
2391 elif r.wType == dns.DNS_QTYPE_TXT:
2392 k = r.data.str[0]
2393 expected = txts[k]
2394 else:
2395 self.fail(f"unexpected wType {r.wType}")
2397 self.assert_timestamps_equal(r.dwTimeStamp, expected)
2399 def test_multi_records_delete_aging(self):
2400 self._test_multi_records_delete(True)
2402 def test_multi_records_delete_no_aging(self):
2403 self._test_multi_records_delete(False)
2405 def _test_dns_delete_times(self, n_days, aging=True):
2406 # In these tests, Windows replaces the records with
2407 # tombstones, while Samba just removes them. Both are
2408 # reasonable approaches (there is no reanimation pathway for
2409 # tombstones), but this means self.ldap_get_records() gets
2410 # different numbers for each. So we use
2411 # self.ldap_get_non_tombstoned_record().
2412 name = 'test'
2413 A = ['A']
2414 B = ['B']
2415 C = ['C']
2416 D = ['D']
2417 self.set_aging(aging)
2418 now = dsdb_dns.unix_to_dns_timestamp(int(time.time()))
2419 n_days_ago = max(now - n_days * 24, 0)
2421 self.dns_update_record(name, A)
2422 self.ldap_update_record(name, A, dwTimeStamp=n_days_ago)
2423 self.ldap_update_record(name, B, dwTimeStamp=n_days_ago)
2424 self.ldap_update_record(name, C, dwTimeStamp=n_days_ago)
2425 self.dns_update_record(name, D)
2426 r = self.dns_query(name, dns.DNS_QTYPE_TXT)
2427 rset = set(x.rdata.txt.str[0] for x in r.answers)
2428 self.assertEqual(rset, set('ABCD'))
2430 atime = self.get_unique_txt_record(name, A).dwTimeStamp
2431 btime = self.get_unique_txt_record(name, B).dwTimeStamp
2432 ctime = self.get_unique_txt_record(name, C).dwTimeStamp
2433 dtime = self.get_unique_txt_record(name, D).dwTimeStamp
2434 recs = self.ldap_get_records(name)
2435 self.assertEqual(len(recs), 4)
2436 r = self.dns_query(name, dns.DNS_QTYPE_TXT)
2437 rset = set(x.rdata.txt.str[0] for x in r.answers)
2438 self.assertEqual(rset, set('ABCD'))
2440 self.assert_timestamps_equal(dtime, self.get_unique_txt_record(name, D))
2442 self.dns_delete(name, D)
2443 self.assert_timestamps_equal(atime, self.get_unique_txt_record(name, A))
2444 self.assert_timestamps_equal(btime, self.get_unique_txt_record(name, B))
2445 self.assert_timestamps_equal(ctime, self.get_unique_txt_record(name, C))
2446 recs = self.ldap_get_non_tombstoned_records(name)
2447 self.assertEqual(len(recs), 3)
2448 r = self.dns_query(name, dns.DNS_QTYPE_TXT)
2449 rset = set(x.rdata.txt.str[0] for x in r.answers)
2450 self.assertEqual(rset, set('ABC'))
2452 self.rpc_delete_txt(name, C)
2453 self.assert_timestamps_equal(atime, self.get_unique_txt_record(name, A))
2454 self.assert_timestamps_equal(btime, self.get_unique_txt_record(name, B))
2455 recs = self.ldap_get_non_tombstoned_records(name)
2456 self.assertEqual(len(recs), 2)
2457 r = self.dns_query(name, dns.DNS_QTYPE_TXT)
2458 rset = set(x.rdata.txt.str[0] for x in r.answers)
2459 self.assertEqual(rset, set('AB'))
2461 self.dns_delete(name, A)
2462 self.assert_timestamps_equal(btime, self.get_unique_txt_record(name, B))
2463 recs = self.ldap_get_records(name)
2464 self.assertEqual(len(recs), 1)
2465 r = self.dns_query(name, dns.DNS_QTYPE_TXT)
2466 rset = set(x.rdata.txt.str[0] for x in r.answers)
2467 self.assertEqual(rset, {'B'})
2469 self.dns_delete(name, B)
2470 recs = self.ldap_get_non_tombstoned_records(name)
2471 # Windows leaves the node with zero records. Samba ends up
2472 # with a tombstone.
2473 self.assertEqual(len(recs), 0)
2474 r = self.dns_query(name, dns.DNS_QTYPE_TXT)
2475 rset = set(x.rdata.txt.str[0] for x in r.answers)
2476 self.assertEqual(len(rset), 0)
2478 def test_dns_delete_times_5_days_aging(self):
2479 self._test_dns_delete_times(5, True)
2481 def test_dns_delete_times_11_days_aging(self):
2482 self._test_dns_delete_times(11, True)
2484 def test_dns_delete_times_366_days_aging(self):
2485 self._test_dns_delete_times(366, True)
2487 def test_dns_delete_times_static_aging(self):
2488 self._test_dns_delete_times(1e10, True)
2490 def test_dns_delete_times_5_days_no_aging(self):
2491 self._test_dns_delete_times(5, False)
2493 def test_dns_delete_times_11_days_no_aging(self):
2494 self._test_dns_delete_times(11, False)
2496 def test_dns_delete_times_366_days_no_aging(self):
2497 self._test_dns_delete_times(366, False)
2499 def test_dns_delete_times_static_no_aging(self):
2500 self._test_dns_delete_times(1e10, False)
2502 def _test_dns_delete_simple(self, a_days, b_days, aging=True, touch=False):
2503 # Here we show that with aging enabled, the timestamp of
2504 # sibling records is *not* modified when a record is deleted.
2506 # With aging disabled, it *is* modified, if the dns server has
2507 # seen it updated before ldap set the time (that is, probably
2508 # the dns server overwrites AD). This happens even if AD
2509 # thinks the record is static.
2510 name = 'test'
2511 A = ['A']
2512 B = ['B']
2513 self.set_aging(aging)
2514 now = dsdb_dns.unix_to_dns_timestamp(int(time.time()))
2515 a_days_ago = max(now - a_days * 24, 0)
2516 b_days_ago = max(now - b_days * 24, 0)
2518 if touch:
2519 self.dns_update_record(name, A)
2520 self.dns_update_record(name, B)
2522 self.ldap_update_record(name, A, dwTimeStamp=a_days_ago)
2523 self.ldap_update_record(name, B, dwTimeStamp=b_days_ago)
2525 atime = self.get_unique_txt_record(name, A).dwTimeStamp
2527 self.dns_delete(name, B)
2528 if not aging and touch:
2529 # this resets the timestamp even if it is a static record.
2530 self.assert_soon_after(self.get_unique_txt_record(name, A), now)
2531 else:
2532 self.assert_timestamps_equal(self.get_unique_txt_record(name, A), atime)
2534 def test_dns_delete_simple_2_3_days_aging(self):
2535 self._test_dns_delete_simple(2, 3, True)
2537 def test_dns_delete_simple_2_3_days_no_aging(self):
2538 self._test_dns_delete_simple(2, 3, False)
2540 def test_dns_delete_simple_2_13_days_aging(self):
2541 self._test_dns_delete_simple(2, 13, True)
2543 def test_dns_delete_simple_2_13_days_no_aging(self):
2544 self._test_dns_delete_simple(2, 13, False)
2546 def test_dns_delete_simple_12_13_days_aging(self):
2547 self._test_dns_delete_simple(12, 13, True)
2549 def test_dns_delete_simple_12_13_days_no_aging(self):
2550 self._test_dns_delete_simple(12, 13, False)
2552 def test_dns_delete_simple_112_113_days_aging(self):
2553 self._test_dns_delete_simple(112, 113, True)
2555 def test_dns_delete_simple_112_113_days_no_aging(self):
2556 self._test_dns_delete_simple(112, 113, False)
2558 def test_dns_delete_simple_0_113_days_aging(self):
2559 # 1e9 hours ago evaluates to 0, i.e static
2560 self._test_dns_delete_simple(1e9, 113, True)
2562 def test_dns_delete_simple_0_113_days_no_aging(self):
2563 self._test_dns_delete_simple(1e9, 113, False)
2565 def test_dns_delete_simple_0_0_days_aging(self):
2566 self._test_dns_delete_simple(1e9, 1e9, True)
2568 def test_dns_delete_simple_0_0_days_no_aging(self):
2569 self._test_dns_delete_simple(1e9, 1e9, False)
2571 def test_dns_delete_simple_10_0_days_aging(self):
2572 self._test_dns_delete_simple(10, 1e9, True)
2574 def test_dns_delete_simple_10_0_days_no_aging(self):
2575 self._test_dns_delete_simple(10, 1e9, False)
2577 def test_dns_delete_simple_2_3_days_aging_touch(self):
2578 self._test_dns_delete_simple(2, 3, True, True)
2580 def test_dns_delete_simple_2_3_days_no_aging_touch(self):
2581 self._test_dns_delete_simple(2, 3, False, True)
2583 def test_dns_delete_simple_2_13_days_aging_touch(self):
2584 self._test_dns_delete_simple(2, 13, True, True)
2586 def test_dns_delete_simple_2_13_days_no_aging_touch(self):
2587 self._test_dns_delete_simple(2, 13, False, True)
2589 def test_dns_delete_simple_12_13_days_aging_touch(self):
2590 self._test_dns_delete_simple(12, 13, True, True)
2592 def test_dns_delete_simple_12_13_days_no_aging_touch(self):
2593 self._test_dns_delete_simple(12, 13, False, True)
2595 def test_dns_delete_simple_112_113_days_aging_touch(self):
2596 self._test_dns_delete_simple(112, 113, True, True)
2598 def test_dns_delete_simple_112_113_days_no_aging_touch(self):
2599 self._test_dns_delete_simple(112, 113, False, True)
2601 def test_dns_delete_simple_0_113_days_aging_touch(self):
2602 # 1e9 hours ago evaluates to 0, i.e static
2603 self._test_dns_delete_simple(1e9, 113, True, True)
2605 def test_dns_delete_simple_0_113_days_no_aging_touch(self):
2606 self._test_dns_delete_simple(1e9, 113, False, True)
2608 def test_dns_delete_simple_0_0_days_aging_touch(self):
2609 self._test_dns_delete_simple(1e9, 1e9, True, True)
2611 def test_dns_delete_simple_0_0_days_no_aging_touch(self):
2612 self._test_dns_delete_simple(1e9, 1e9, False, True)
2614 def test_dns_delete_simple_10_0_days_aging_touch(self):
2615 self._test_dns_delete_simple(10, 1e9, True, True)
2617 def test_dns_delete_simple_10_0_days_no_aging_touch(self):
2618 self._test_dns_delete_simple(10, 1e9, False, True)
2620 def windows_variation(self, fn, *args, msg=None, **kwargs):
2621 try:
2622 fn(*args, **kwargs)
2623 except AssertionError as e:
2624 print("Expected success on Windows only, failed as expected:\n" +
2625 c_GREEN(e))
2626 return
2627 print(c_RED("known Windows failure"))
2628 if msg is not None:
2629 print(c_DARK_YELLOW(msg))
2630 print("Expected success on Windows:\n" +
2631 c_GREEN(f"{fn.__name__} {args} {kwargs}"))
2633 def _test_dns_add_sibling(self, a_days, refresh, aging=True, touch=False):
2634 # Here we show that with aging enabled, the timestamp of
2635 # sibling records *is* modified when a record is added.
2637 # With aging disabled, it *is* modified, if the dns server has
2638 # seen it updated before ldap set the time (that is, probably
2639 # the dns server overwrites AD). This happens even if AD
2640 # thinks the record is static.
2641 name = 'test'
2642 A = ['A']
2643 B = ['B']
2644 self.set_zone_int_params(RefreshInterval=int(refresh),
2645 NoRefreshInterval=7,
2646 Aging=int(aging))
2648 now = dsdb_dns.unix_to_dns_timestamp(int(time.time()))
2649 a_days_ago = max(now - a_days * 24, 0)
2651 if touch:
2652 self.dns_update_record(name, A)
2654 self.ldap_update_record(name, A, dwTimeStamp=a_days_ago)
2656 atime = self.get_unique_txt_record(name, A).dwTimeStamp
2658 self.dns_update_record(name, B)
2659 a_rec = self.get_unique_txt_record(name, A)
2660 if not aging and touch:
2661 # On Windows, this resets the timestamp even if it is a
2662 # static record, though in that case it may be a
2663 # transitory effect of the DNS cache. We will insist on
2664 # the Samba behaviour of not changing (that is
2665 # un-static-ing) a zero timestamp, because that is the
2666 # sensible thing.
2667 if a_days_ago == 0:
2668 self.windows_variation(
2669 self.assert_soon_after, a_rec, now,
2670 msg="Windows resets static siblings (cache effect?)")
2671 self.assert_timestamps_equal(a_rec, 0)
2672 else:
2673 self.assert_soon_after(a_rec, now)
2674 else:
2675 self.assert_timestamps_equal(a_rec, atime)
2677 b_rec = self.get_unique_txt_record(name, B)
2678 self.assert_soon_after(b_rec, now)
2680 def test_dns_add_sibling_2_7_days_aging(self):
2681 self._test_dns_add_sibling(2, 7, True)
2683 def test_dns_add_sibling_2_7_days_no_aging(self):
2684 self._test_dns_add_sibling(2, 7, False)
2686 def test_dns_add_sibling_12_7_days_aging(self):
2687 self._test_dns_add_sibling(12, 7, True)
2689 def test_dns_add_sibling_12_7_days_no_aging(self):
2690 self._test_dns_add_sibling(12, 7, False)
2692 def test_dns_add_sibling_12_3_days_aging(self):
2693 self._test_dns_add_sibling(12, 3, True)
2695 def test_dns_add_sibling_12_3_days_no_aging(self):
2696 self._test_dns_add_sibling(12, 3, False)
2698 def test_dns_add_sibling_112_7_days_aging(self):
2699 self._test_dns_add_sibling(112, 7, True)
2701 def test_dns_add_sibling_112_7_days_no_aging(self):
2702 self._test_dns_add_sibling(112, 7, False)
2704 def test_dns_add_sibling_12_113_days_aging(self):
2705 self._test_dns_add_sibling(12, 113, True)
2707 def test_dns_add_sibling_12_113_days_no_aging(self):
2708 self._test_dns_add_sibling(12, 113, False)
2710 def test_dns_add_sibling_0_7_days_aging(self):
2711 # 1e9 days ago evaluates to 0, i.e static
2712 self._test_dns_add_sibling(1e9, 7, True)
2714 def test_dns_add_sibling_0_7_days_no_aging(self):
2715 self._test_dns_add_sibling(1e9, 7, False)
2717 def test_dns_add_sibling_0_0_days_aging(self):
2718 self._test_dns_add_sibling(1e9, 0, True)
2720 def test_dns_add_sibling_0_0_days_no_aging(self):
2721 self._test_dns_add_sibling(1e9, 0, False)
2723 def test_dns_add_sibling_10_0_days_aging(self):
2724 self._test_dns_add_sibling(10, 0, True)
2726 def test_dns_add_sibling_10_0_days_no_aging(self):
2727 self._test_dns_add_sibling(10, 0, False)
2729 def test_dns_add_sibling_2_7_days_aging_touch(self):
2730 self._test_dns_add_sibling(2, 7, True, True)
2732 def test_dns_add_sibling_2_7_days_no_aging_touch(self):
2733 self._test_dns_add_sibling(2, 7, False, True)
2735 def test_dns_add_sibling_12_7_days_aging_touch(self):
2736 self._test_dns_add_sibling(12, 7, True, True)
2738 def test_dns_add_sibling_12_7_days_no_aging_touch(self):
2739 self._test_dns_add_sibling(12, 7, False, True)
2741 def test_dns_add_sibling_12_3_days_aging_touch(self):
2742 self._test_dns_add_sibling(12, 3, True, True)
2744 def test_dns_add_sibling_12_3_days_no_aging_touch(self):
2745 self._test_dns_add_sibling(12, 3, False, True)
2747 def test_dns_add_sibling_112_7_days_aging_touch(self):
2748 self._test_dns_add_sibling(112, 7, True, True)
2750 def test_dns_add_sibling_112_7_days_no_aging_touch(self):
2751 self._test_dns_add_sibling(112, 7, False, True)
2753 def test_dns_add_sibling_12_113_days_aging_touch(self):
2754 self._test_dns_add_sibling(12, 113, True, True)
2756 def test_dns_add_sibling_12_113_days_no_aging_touch(self):
2757 self._test_dns_add_sibling(12, 113, False, True)
2759 def test_dns_add_sibling_0_7_days_aging_touch(self):
2760 self._test_dns_add_sibling(1e9, 7, True, True)
2762 def test_dns_add_sibling_0_7_days_no_aging_touch(self):
2763 self._test_dns_add_sibling(1e9, 7, False, True)
2765 def test_dns_add_sibling_0_0_days_aging_touch(self):
2766 self._test_dns_add_sibling(1e9, 0, True, True)
2768 def test_dns_add_sibling_0_0_days_no_aging_touch(self):
2769 self._test_dns_add_sibling(1e9, 0, False, True)
2771 def test_dns_add_sibling_10_0_days_aging_touch(self):
2772 self._test_dns_add_sibling(10, 0, True, True)
2774 def test_dns_add_sibling_10_0_days_no_aging_touch(self):
2775 self._test_dns_add_sibling(10, 0, False, True)
2777 TestProgram(module=__name__, opts=subunitopts)