1 # Unix SMB/CIFS implementation.
3 # Copyright (C) Catalyst.Net Ltd. 2022
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from samba
.tests
import TestCaseInTempDir
22 from pprint
import pprint
24 HERE
= os
.path
.dirname(__file__
)
25 S4_SERVER
= os
.path
.join(HERE
, '../../../../bin/test_s4_logging')
26 S3_SERVER
= os
.path
.join(HERE
, '../../../../bin/test_s3_logging')
28 CLASS_LIST
= ["all", "tdb", "printdrivers", "lanman", "smb",
29 "rpc_parse", "rpc_srv", "rpc_cli", "passdb", "sam", "auth",
30 "winbind", "vfs", "idmap", "quota", "acls", "locking", "msdfs",
31 "dmapi", "registry", "scavenger", "dns", "ldb", "tevent",
32 "auth_audit", "auth_json_audit", "kerberos", "drs_repl",
33 "smb2", "smb2_credits", "dsdb_audit", "dsdb_json_audit",
34 "dsdb_password_audit", "dsdb_password_json_audit",
35 "dsdb_transaction_audit", "dsdb_transaction_json_audit",
36 "dsdb_group_audit", "dsdb_group_json_audit", "ldapsrv"]
39 CLASS_CODES
= {k
: i
for i
, k
in enumerate(CLASS_LIST
)}
42 class S4LoggingTests(TestCaseInTempDir
):
44 def _write_smb_conf(self
,
46 default_file
="default",
48 self
.smbconf
= os
.path
.join(self
.tempdir
, "smb.conf")
50 with
open(self
.smbconf
, "w") as f
:
52 if default_file
is not None:
53 dest
= os
.path
.join(self
.tempdir
,
55 f
.write(f
" log file = {dest}\n")
57 f
.write(" log level = ")
59 f
.write(f
"{default_level}")
61 for dbg_class
, log_level
, log_file
in mapping
:
64 if log_level
is not None:
65 f
.write(f
':{log_level}')
66 if log_file
is not None:
67 dest
= os
.path
.join(self
.tempdir
,
72 self
.addCleanup(os
.unlink
, self
.smbconf
)
74 def _extract_log_level_line(self
, new_level
=2):
75 # extricate the 'log level' line from the smb.conf, returning
76 # the value, and replacing the log level line with something
78 smbconf2
= self
.smbconf
+ 'new'
79 with
open(self
.smbconf
) as f
:
80 with
open(smbconf2
, 'w') as f2
:
82 if 'log level' in line
:
83 debug_arg
= line
.split('=', 1)[1].strip()
84 if new_level
is not None:
85 f2
.write(f
' log level = {new_level}\n')
88 os
.replace(smbconf2
, self
.smbconf
)
91 def _get_expected_strings(self
, mapping
,
93 default_file
='default',
95 default
= os
.path
.join(self
.tempdir
, default_file
)
96 expected
= {default
: []}
98 # " logging for 'dns' [21], at level 4"
99 for dbg_class
, log_level
, log_file
in mapping
:
101 log_file
= default_file
103 f
= os
.path
.join(self
.tempdir
, log_file
)
104 expected
.setdefault(f
, [])
105 if log_level
< level_filter
:
107 if file_filter
not in (None, log_file
):
109 s
= (f
" logging for '{dbg_class}' [{CLASS_CODES[dbg_class]}], "
110 f
"at level {level_filter}")
111 expected
[f
].append(s
)
115 def _run_s4_logger(self
, log_level
, *extra_args
):
118 '-L', str(log_level
),
121 p
= subprocess
.run(cmd
,
122 stdout
=subprocess
.PIPE
,
123 stderr
=subprocess
.PIPE
)
124 self
.assertEqual(p
.returncode
, 0,
125 f
"'{' '.join(cmd)}' failed ({p.returncode})")
127 return p
.stdout
.decode(), p
.stderr
.decode()
129 def assert_string_contains(self
, string
, expected_lines
,
131 expected_lines
= set(expected_lines
)
132 string_lines
= set(string
.split('\n'))
133 present_lines
= string_lines
& expected_lines
134 if present_lines
!= expected_lines
:
137 print("expected %d lines, found %d" %
138 (len(expected_lines
), len(present_lines
)))
139 print("missing lines:")
140 pprint(expected_lines
- present_lines
)
141 raise AssertionError("missing lines")
143 def assert_file_contains(self
, filename
, expected_lines
):
144 with
open(filename
) as f
:
146 self
.assert_string_contains(string
, expected_lines
, filename
)
148 def assert_n_known_lines_string(self
, string
, n
):
149 count
= string
.count("logging for '")
151 raise AssertionError(
152 f
"string has {count} lines, expected {n}")
154 def assert_n_known_lines(self
, filename
, n
):
155 with
open(filename
) as f
:
157 count
= string
.count(" logging for '")
159 raise AssertionError(
160 f
"{filename} has {count} lines, expected {n}")
162 def assert_unlink_expected_strings(self
, expected_strings
):
163 for k
, v
in expected_strings
.items():
164 if not os
.path
.exists(k
):
165 self
.fail(f
"{k} does not exist")
166 self
.assert_file_contains(k
, v
)
167 self
.assert_n_known_lines(k
, len(v
))
170 def test_each_to_its_own(self
):
172 mapping
= [(x
, level
, x
) for x
in CLASS_LIST
]
173 expected_strings
= self
._get
_expected
_strings
(mapping
, level
)
175 self
._write
_smb
_conf
(mapping
=mapping
)
176 stdout
, stderr
= self
._run
_s
4_logger
(level
)
177 self
.assert_unlink_expected_strings(expected_strings
)
179 def test_all_to_one(self
):
182 mapping
= [(x
, level
, dest
) for x
in CLASS_LIST
]
183 expected_strings
= self
._get
_expected
_strings
(mapping
, level
)
185 self
._write
_smb
_conf
(mapping
=mapping
)
186 stdout
, stderr
= self
._run
_s
4_logger
(level
)
187 self
.assert_unlink_expected_strings(expected_strings
)
189 def test_bifurcate(self
):
191 dests
= ['even', 'odd']
192 mapping
= [(x
, level
+ 1, dests
[i
& 1])
193 for i
, x
in enumerate(CLASS_LIST
)]
194 expected_strings
= self
._get
_expected
_strings
(mapping
, level
)
196 self
._write
_smb
_conf
(mapping
=mapping
)
197 stdout
, stderr
= self
._run
_s
4_logger
(level
)
198 self
.assert_unlink_expected_strings(expected_strings
)
200 def test_bifurcate_level_out_of_range(self
):
201 # nothing will be logged, because we're logging at a too high
204 dests
= ['even', 'odd']
205 mapping
= [(x
, level
- 1, dests
[i
& 1])
206 for i
, x
in enumerate(CLASS_LIST
)]
207 expected_strings
= self
._get
_expected
_strings
(mapping
, level
)
209 self
._write
_smb
_conf
(mapping
=mapping
)
210 stdout
, stderr
= self
._run
_s
4_logger
(level
)
211 self
.assert_unlink_expected_strings(expected_strings
)
213 def test_bifurcate_misc_log_level(self
):
214 # We are sending even numbers to default and odd numbers to
215 # 'odd', at various levels, depending on mod 3. Like this:
217 # log level = 2 all:5 \
225 # Therefore, 'default' should get classes that are (0 or 4) % 6
226 # and 'odd' should get classes that are (1 or 3) % 6.
229 dests
= [None, 'odd']
231 for i
, x
in enumerate(CLASS_LIST
):
233 log_level
= level
+ 1 - (i
% 3)
234 mapping
.append((x
, log_level
, dests
[parity
]))
236 expected_strings
= self
._get
_expected
_strings
(mapping
, level
)
238 self
._write
_smb
_conf
(mapping
=mapping
)
239 stdout
, stderr
= self
._run
_s
4_logger
(level
)
240 self
.assert_unlink_expected_strings(expected_strings
)
242 def test_all_different_ways_cmdline_d(self
):
244 dests
= [None, 'a', 'b', 'c']
247 for i
, x
in enumerate(CLASS_LIST
):
250 log_level
= seed
% 10
252 mapping
.append((x
, log_level
, dests
[d
]))
254 expected_strings
= self
._get
_expected
_strings
(mapping
, level
)
256 self
._write
_smb
_conf
(mapping
=mapping
)
257 debug_arg
= self
._extract
_log
_level
_line
(26)
259 stdout
, stderr
= self
._run
_s
4_logger
(level
, '-d', debug_arg
)
260 self
.assert_unlink_expected_strings(expected_strings
)
262 def test_all_different_ways_cmdline_d_interactive(self
):
264 dests
= [None, 'a', 'b', 'c']
267 for i
, x
in enumerate(CLASS_LIST
):
270 log_level
= seed
% 10
272 mapping
.append((x
, log_level
, dests
[d
]))
274 expected_strings
= self
._get
_expected
_strings
(mapping
, level
)
276 self
._write
_smb
_conf
(mapping
=mapping
)
277 debug_arg
= self
._extract
_log
_level
_line
(None)
278 stdout
, stderr
= self
._run
_s
4_logger
(level
, '-d', debug_arg
, '-i')
280 for v
in expected_strings
.values():
281 # stderr doesn't end up with leading ' '
282 expected_lines
.extend([x
.strip() for x
in v
])
284 self
.assert_string_contains(stderr
, expected_lines
)
285 self
.assert_n_known_lines_string(stderr
, len(expected_lines
))
287 def test_only_some_level_0(self
):
288 # running the logger with -L 0 makes the log messages run at
289 # level 0 (i.e DBG_ERR), so we always see them in default,
290 # even though smb.conf doesn't ask.
291 mapping
= [(x
, 3, ['default', 'bees']['b' in x
])
293 expected_strings
= self
._get
_expected
_strings
(mapping
, 0)
294 self
._write
_smb
_conf
(mapping
=[x
for x
in mapping
if x
[2] == 'bees'])
295 stdout
, stderr
= self
._run
_s
4_logger
(0)
296 self
.assert_unlink_expected_strings(expected_strings
)
298 def test_only_some_level_3(self
):
299 # here, we're expecting the unmentioned non-b classes to just
302 mapping
= [(x
, level
, 'bees') for x
in CLASS_LIST
if 'b' in x
]
303 expected_strings
= self
._get
_expected
_strings
(mapping
, level
)
304 self
._write
_smb
_conf
(mapping
=[x
for x
in mapping
if x
[2] == 'bees'])
305 stdout
, stderr
= self
._run
_s
4_logger
(level
)
306 self
.assert_unlink_expected_strings(expected_strings
)
311 expected_strings
= self
._get
_expected
_strings
(mapping
, level
)
312 self
._write
_smb
_conf
(mapping
=mapping
)
313 stdout
, stderr
= self
._run
_s
4_logger
(level
)
314 self
.assert_unlink_expected_strings(expected_strings
)
316 def test_none_high_default(self
):
317 # We set the default level to 5 and do nothing else special,
318 # which means we need a different mapping for the smb.conf
319 # than the expected strings.
321 mapping
= [(x
, 5, 'default') for x
in CLASS_LIST
]
322 expected_strings
= self
._get
_expected
_strings
(mapping
, level
)
323 # note the empty mapping in smb.conf
324 self
._write
_smb
_conf
(mapping
=[], default_level
=5)
325 stdout
, stderr
= self
._run
_s
4_logger
(level
)
326 self
.assert_unlink_expected_strings(expected_strings
)
328 def test_none_high_cmdline_d(self
):
329 # We set the default level to 2, but run the 'server' with -d 10.
331 mapping
= [(x
, 10, 'default') for x
in CLASS_LIST
]
332 expected_strings
= self
._get
_expected
_strings
(mapping
, level
)
333 # note the empty mapping in smb.conf
334 self
._write
_smb
_conf
(mapping
=[])
335 stdout
, stderr
= self
._run
_s
4_logger
(level
, '-d', '10')
336 self
.assert_unlink_expected_strings(expected_strings
)
338 def test_interactive_high_default_simple(self
):
339 # running with -i should send everything to stderr.
341 mapping
= [(x
, 5, 'default') for x
in CLASS_LIST
]
342 expected_strings
= self
._get
_expected
_strings
(mapping
, level
)
343 self
._write
_smb
_conf
(mapping
=[], default_level
=5)
344 stdout
, stderr
= self
._run
_s
4_logger
(level
, '-i')
346 for v
in expected_strings
.values():
347 # stderr doesn't end up with leading ' '
348 expected_lines
.extend([x
.strip() for x
in v
])
350 self
.assert_string_contains(stderr
, expected_lines
)
352 def test_interactive_complex_smb_conf(self
):
353 # running with -i should send everything to stderr. The
354 # smb.conf will set the levels, but the target files are
356 # (this is the test_bifurcate_misc_log_level() smb.conf).
358 dests
= [None, 'odd']
360 for i
, x
in enumerate(CLASS_LIST
):
362 log_level
= level
+ 1 - (i
% 3)
363 mapping
.append((x
, log_level
, dests
[parity
]))
365 expected_strings
= self
._get
_expected
_strings
(mapping
, level
)
367 self
._write
_smb
_conf
(mapping
=mapping
)
368 stdout
, stderr
= self
._run
_s
4_logger
(level
, '-i')
370 for v
in expected_strings
.values():
371 # stderr doesn't end up with leading ' '
372 expected_lines
.extend([x
.strip() for x
in v
])
374 self
.assert_string_contains(stderr
, expected_lines
)
377 class S3LoggingTests(S4LoggingTests
):
379 # These tests were developed for testing the test_logger when
380 # linked against CMDLINE_S4 (see lib/util/wscript_build), but can
381 # also run when linked against CMDLINE_S3.