ctdb-scripts: Improve update and listing code
[samba4-gss.git] / python / samba / tests / logfiles.py
blobe8fd4b0c5c4d365d4e8cecf647c6aa5234117933
1 # Unix SMB/CIFS implementation.
3 # Copyright (C) Catalyst.Net Ltd. 2022
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 import subprocess
20 import os
21 from samba.tests import TestCaseInTempDir
22 from pprint import pprint
24 HERE = os.path.dirname(__file__)
25 S4_SERVER = os.path.join(HERE, '../../../../bin/test_s4_logging')
26 S3_SERVER = os.path.join(HERE, '../../../../bin/test_s3_logging')
28 CLASS_LIST = ["all", "tdb", "printdrivers", "lanman", "smb",
29 "rpc_parse", "rpc_srv", "rpc_cli", "passdb", "sam", "auth",
30 "winbind", "vfs", "idmap", "quota", "acls", "locking", "msdfs",
31 "dmapi", "registry", "scavenger", "dns", "ldb", "tevent",
32 "auth_audit", "auth_json_audit", "kerberos", "drs_repl",
33 "smb2", "smb2_credits", "dsdb_audit", "dsdb_json_audit",
34 "dsdb_password_audit", "dsdb_password_json_audit",
35 "dsdb_transaction_audit", "dsdb_transaction_json_audit",
36 "dsdb_group_audit", "dsdb_group_json_audit", "ldapsrv"]
39 CLASS_CODES = {k: i for i, k in enumerate(CLASS_LIST)}
42 class S4LoggingTests(TestCaseInTempDir):
43 server = S4_SERVER
44 def _write_smb_conf(self,
45 default_level=2,
46 default_file="default",
47 mapping=()):
48 self.smbconf = os.path.join(self.tempdir, "smb.conf")
50 with open(self.smbconf, "w") as f:
51 f.write('[global]\n')
52 if default_file is not None:
53 dest = os.path.join(self.tempdir,
54 default_file)
55 f.write(f" log file = {dest}\n")
57 f.write(" log level = ")
58 if default_level:
59 f.write(f"{default_level}")
61 for dbg_class, log_level, log_file in mapping:
62 f.write(' ')
63 f.write(dbg_class)
64 if log_level is not None:
65 f.write(f':{log_level}')
66 if log_file is not None:
67 dest = os.path.join(self.tempdir,
68 log_file)
70 f.write(f'@{dest}')
71 f.write('\n')
72 self.addCleanup(os.unlink, self.smbconf)
74 def _extract_log_level_line(self, new_level=2):
75 # extricate the 'log level' line from the smb.conf, returning
76 # the value, and replacing the log level line with something
77 # innocuous.
78 smbconf2 = self.smbconf + 'new'
79 with open(self.smbconf) as f:
80 with open(smbconf2, 'w') as f2:
81 for line in f:
82 if 'log level' in line:
83 debug_arg = line.split('=', 1)[1].strip()
84 if new_level is not None:
85 f2.write(f' log level = {new_level}\n')
86 else:
87 f2.write(line)
88 os.replace(smbconf2, self.smbconf)
89 return debug_arg
91 def _get_expected_strings(self, mapping,
92 level_filter,
93 default_file='default',
94 file_filter=None):
95 default = os.path.join(self.tempdir, default_file)
96 expected = {default: []}
97 # this kind of thing:
98 # " logging for 'dns' [21], at level 4"
99 for dbg_class, log_level, log_file in mapping:
100 if log_file is None:
101 log_file = default_file
103 f = os.path.join(self.tempdir, log_file)
104 expected.setdefault(f, [])
105 if log_level < level_filter:
106 continue
107 if file_filter not in (None, log_file):
108 continue
109 s = (f" logging for '{dbg_class}' [{CLASS_CODES[dbg_class]}], "
110 f"at level {level_filter}")
111 expected[f].append(s)
113 return expected
115 def _run_s4_logger(self, log_level, *extra_args):
116 cmd = [self.server,
117 '-s', self.smbconf,
118 '-L', str(log_level),
119 *extra_args]
121 p = subprocess.run(cmd,
122 stdout=subprocess.PIPE,
123 stderr=subprocess.PIPE)
124 self.assertEqual(p.returncode, 0,
125 f"'{' '.join(cmd)}' failed ({p.returncode})")
127 return p.stdout.decode(), p.stderr.decode()
129 def assert_string_contains(self, string, expected_lines,
130 filename=None):
131 expected_lines = set(expected_lines)
132 string_lines = set(string.split('\n'))
133 present_lines = string_lines & expected_lines
134 if present_lines != expected_lines:
135 if filename:
136 print(filename)
137 print("expected %d lines, found %d" %
138 (len(expected_lines), len(present_lines)))
139 print("missing lines:")
140 pprint(expected_lines - present_lines)
141 raise AssertionError("missing lines")
143 def assert_file_contains(self, filename, expected_lines):
144 with open(filename) as f:
145 string = f.read()
146 self.assert_string_contains(string, expected_lines, filename)
148 def assert_n_known_lines_string(self, string, n):
149 count = string.count("logging for '")
150 if count != n:
151 raise AssertionError(
152 f"string has {count} lines, expected {n}")
154 def assert_n_known_lines(self, filename, n):
155 with open(filename) as f:
156 string = f.read()
157 count = string.count(" logging for '")
158 if count != n:
159 raise AssertionError(
160 f"{filename} has {count} lines, expected {n}")
162 def assert_unlink_expected_strings(self, expected_strings):
163 for k, v in expected_strings.items():
164 if not os.path.exists(k):
165 self.fail(f"{k} does not exist")
166 self.assert_file_contains(k, v)
167 self.assert_n_known_lines(k, len(v))
168 os.unlink(k)
170 def test_each_to_its_own(self):
171 level = 4
172 mapping = [(x, level, x) for x in CLASS_LIST]
173 expected_strings = self._get_expected_strings(mapping, level)
175 self._write_smb_conf(mapping=mapping)
176 stdout, stderr = self._run_s4_logger(level)
177 self.assert_unlink_expected_strings(expected_strings)
179 def test_all_to_one(self):
180 level = 4
181 dest = 'everything'
182 mapping = [(x, level, dest) for x in CLASS_LIST]
183 expected_strings = self._get_expected_strings(mapping, level)
185 self._write_smb_conf(mapping=mapping)
186 stdout, stderr = self._run_s4_logger(level)
187 self.assert_unlink_expected_strings(expected_strings)
189 def test_bifurcate(self):
190 level = 4
191 dests = ['even', 'odd']
192 mapping = [(x, level + 1, dests[i & 1])
193 for i, x in enumerate(CLASS_LIST)]
194 expected_strings = self._get_expected_strings(mapping, level)
196 self._write_smb_conf(mapping=mapping)
197 stdout, stderr = self._run_s4_logger(level)
198 self.assert_unlink_expected_strings(expected_strings)
200 def test_bifurcate_level_out_of_range(self):
201 # nothing will be logged, because we're logging at a too high
202 # level.
203 level = 4
204 dests = ['even', 'odd']
205 mapping = [(x, level - 1, dests[i & 1])
206 for i, x in enumerate(CLASS_LIST)]
207 expected_strings = self._get_expected_strings(mapping, level)
209 self._write_smb_conf(mapping=mapping)
210 stdout, stderr = self._run_s4_logger(level)
211 self.assert_unlink_expected_strings(expected_strings)
213 def test_bifurcate_misc_log_level(self):
214 # We are sending even numbers to default and odd numbers to
215 # 'odd', at various levels, depending on mod 3. Like this:
217 # log level = 2 all:5 \
218 # tdb:4@odd \
219 # printdrivers:3 \
220 # lanman:5@odd \
221 # smb:4 \
222 # rpc_parse:3@odd \
223 # rpc_srv:5 ...
225 # Therefore, 'default' should get classes that are (0 or 4) % 6
226 # and 'odd' should get classes that are (1 or 3) % 6.
228 level = 4
229 dests = [None, 'odd']
230 mapping = []
231 for i, x in enumerate(CLASS_LIST):
232 parity = i & 1
233 log_level = level + 1 - (i % 3)
234 mapping.append((x, log_level, dests[parity]))
236 expected_strings = self._get_expected_strings(mapping, level)
238 self._write_smb_conf(mapping=mapping)
239 stdout, stderr = self._run_s4_logger(level)
240 self.assert_unlink_expected_strings(expected_strings)
242 def test_all_different_ways_cmdline_d(self):
243 level = 4
244 dests = [None, 'a', 'b', 'c']
245 mapping = []
246 seed = 123
247 for i, x in enumerate(CLASS_LIST):
248 d = seed & 3
249 seed = seed * 17 + 1
250 log_level = seed % 10
251 seed &= 0xff
252 mapping.append((x, log_level, dests[d]))
254 expected_strings = self._get_expected_strings(mapping, level)
256 self._write_smb_conf(mapping=mapping)
257 debug_arg = self._extract_log_level_line(26)
259 stdout, stderr = self._run_s4_logger(level, '-d', debug_arg)
260 self.assert_unlink_expected_strings(expected_strings)
262 def test_all_different_ways_cmdline_d_interactive(self):
263 level = 4
264 dests = [None, 'a', 'b', 'c']
265 mapping = []
266 seed = 1234
267 for i, x in enumerate(CLASS_LIST):
268 d = seed & 3
269 seed = seed * 13 + 1
270 log_level = seed % 10
271 seed &= 0xff
272 mapping.append((x, log_level, dests[d]))
274 expected_strings = self._get_expected_strings(mapping, level)
276 self._write_smb_conf(mapping=mapping)
277 debug_arg = self._extract_log_level_line(None)
278 stdout, stderr = self._run_s4_logger(level, '-d', debug_arg, '-i')
279 expected_lines = []
280 for v in expected_strings.values():
281 # stderr doesn't end up with leading ' '
282 expected_lines.extend([x.strip() for x in v])
284 self.assert_string_contains(stderr, expected_lines)
285 self.assert_n_known_lines_string(stderr, len(expected_lines))
287 def test_only_some_level_0(self):
288 # running the logger with -L 0 makes the log messages run at
289 # level 0 (i.e DBG_ERR), so we always see them in default,
290 # even though smb.conf doesn't ask.
291 mapping = [(x, 3, ['default', 'bees']['b' in x])
292 for x in CLASS_LIST]
293 expected_strings = self._get_expected_strings(mapping, 0)
294 self._write_smb_conf(mapping=[x for x in mapping if x[2] == 'bees'])
295 stdout, stderr = self._run_s4_logger(0)
296 self.assert_unlink_expected_strings(expected_strings)
298 def test_only_some_level_3(self):
299 # here, we're expecting the unmentioned non-b classes to just
300 # disappear.
301 level = 3
302 mapping = [(x, level, 'bees') for x in CLASS_LIST if 'b' in x]
303 expected_strings = self._get_expected_strings(mapping, level)
304 self._write_smb_conf(mapping=[x for x in mapping if x[2] == 'bees'])
305 stdout, stderr = self._run_s4_logger(level)
306 self.assert_unlink_expected_strings(expected_strings)
308 def test_none(self):
309 level = 4
310 mapping = []
311 expected_strings = self._get_expected_strings(mapping, level)
312 self._write_smb_conf(mapping=mapping)
313 stdout, stderr = self._run_s4_logger(level)
314 self.assert_unlink_expected_strings(expected_strings)
316 def test_none_high_default(self):
317 # We set the default level to 5 and do nothing else special,
318 # which means we need a different mapping for the smb.conf
319 # than the expected strings.
320 level = 4
321 mapping = [(x, 5, 'default') for x in CLASS_LIST]
322 expected_strings = self._get_expected_strings(mapping, level)
323 # note the empty mapping in smb.conf
324 self._write_smb_conf(mapping=[], default_level=5)
325 stdout, stderr = self._run_s4_logger(level)
326 self.assert_unlink_expected_strings(expected_strings)
328 def test_none_high_cmdline_d(self):
329 # We set the default level to 2, but run the 'server' with -d 10.
330 level = 4
331 mapping = [(x, 10, 'default') for x in CLASS_LIST]
332 expected_strings = self._get_expected_strings(mapping, level)
333 # note the empty mapping in smb.conf
334 self._write_smb_conf(mapping=[])
335 stdout, stderr = self._run_s4_logger(level, '-d', '10')
336 self.assert_unlink_expected_strings(expected_strings)
338 def test_interactive_high_default_simple(self):
339 # running with -i should send everything to stderr.
340 level = 4
341 mapping = [(x, 5, 'default') for x in CLASS_LIST]
342 expected_strings = self._get_expected_strings(mapping, level)
343 self._write_smb_conf(mapping=[], default_level=5)
344 stdout, stderr = self._run_s4_logger(level, '-i')
345 expected_lines = []
346 for v in expected_strings.values():
347 # stderr doesn't end up with leading ' '
348 expected_lines.extend([x.strip() for x in v])
350 self.assert_string_contains(stderr, expected_lines)
352 def test_interactive_complex_smb_conf(self):
353 # running with -i should send everything to stderr. The
354 # smb.conf will set the levels, but the target files are
355 # overridden.
356 # (this is the test_bifurcate_misc_log_level() smb.conf).
357 level = 4
358 dests = [None, 'odd']
359 mapping = []
360 for i, x in enumerate(CLASS_LIST):
361 parity = i & 1
362 log_level = level + 1 - (i % 3)
363 mapping.append((x, log_level, dests[parity]))
365 expected_strings = self._get_expected_strings(mapping, level)
367 self._write_smb_conf(mapping=mapping)
368 stdout, stderr = self._run_s4_logger(level, '-i')
369 expected_lines = []
370 for v in expected_strings.values():
371 # stderr doesn't end up with leading ' '
372 expected_lines.extend([x.strip() for x in v])
374 self.assert_string_contains(stderr, expected_lines)
377 class S3LoggingTests(S4LoggingTests):
378 server = S3_SERVER
379 # These tests were developed for testing the test_logger when
380 # linked against CMDLINE_S4 (see lib/util/wscript_build), but can
381 # also run when linked against CMDLINE_S3.