1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Tests for the Auth and AuthZ logging.
22 from samba
.messaging
import Messaging
23 from samba
.dcerpc
.messaging
import MSG_AUTH_LOG
, AUTH_EVENT_NAME
24 from samba
.param
import LoadParm
25 from samba
import string_is_guid
31 def default_msg_filter(msg
):
32 # When our authentication logging tests were written, these were the only
33 # supported message types. The tests were built on the assumption that no
34 # new types would be added, and violating this assumption will result in
35 # many tests failing as they receive messages that they weren’t
36 # expecting. To allow these tests to continue to pass, this default filter
37 # makes sure that only messages for which the tests are prepared pass
39 default_supported_types
= {
44 return msg
['type'] in default_supported_types
47 class NoMessageException(Exception):
51 class AuthLogTestBase(samba
.tests
.TestCase
):
57 # connect to the server's messaging bus (we need to explicitly load a
58 # different smb.conf here, because in all other respects this test
59 # wants to act as a separate remote client)
60 server_conf
= os
.getenv('SERVERCONFFILE')
62 lp_ctx
= LoadParm(filename_for_non_global_lp
=server_conf
)
64 lp_ctx
= samba
.tests
.env_loadparm()
65 cls
.msg_ctx
= Messaging((1,), lp_ctx
=lp_ctx
)
66 cls
.msg_ctx
.irpc_add_name(AUTH_EVENT_NAME
)
68 # Now switch back to using the client-side smb.conf. The tests will
69 # use the first interface in the client.conf (we need to strip off
70 # the subnet mask portion)
71 lp_ctx
= samba
.tests
.env_loadparm()
72 client_ip_and_mask
= lp_ctx
.get('interfaces')[0]
73 client_ip
= client_ip_and_mask
.split('/')[0]
75 # the messaging ctx is the server's view of the world, so our own
76 # client IP will be the remoteAddress when connections are logged
77 cls
.remoteAddress
= client_ip
79 def messageHandler(context
, msgType
, src
, message
):
80 # This does not look like sub unit output and it
81 # makes these tests much easier to debug.
83 jsonMsg
= json
.loads(message
)
84 context
["messages"].append(jsonMsg
)
86 cls
.context
= {"messages": []}
87 cls
.msg_handler_and_context
= (messageHandler
, cls
.context
)
88 cls
.msg_ctx
.register(cls
.msg_handler_and_context
,
89 msg_type
=MSG_AUTH_LOG
)
91 cls
.server
= os
.environ
["SERVER"]
95 def tearDownClass(cls
):
96 cls
.msg_ctx
.deregister(cls
.msg_handler_and_context
,
97 msg_type
=MSG_AUTH_LOG
)
98 cls
.msg_ctx
.irpc_remove_name(AUTH_EVENT_NAME
)
100 super().tearDownClass()
104 type(self
).discardMessages()
106 def isRemote(self
, message
):
107 if self
.remoteAddress
is None:
115 message_type
= message
["type"]
116 if message_type
in supported_types
:
117 remote
= message
[message_type
]["remoteAddress"]
122 addr
= remote
.split(":")
123 return addr
[1] == self
.remoteAddress
127 def waitForMessages(self
, isLastExpectedMessage
, connection
=None, *,
128 msgFilter
=default_msg_filter
):
129 """Wait for all the expected messages to arrive
130 The connection is passed through to keep the connection alive
131 until all the logging messages have been received.
133 By default, only Authentication and Authorization messages will be
134 returned, so that old tests continue to pass. To receive all messages,
142 msg
= self
.nextMessage(msgFilter
=msgFilter
)
143 except NoMessageException
:
147 if isLastExpectedMessage(msg
):
150 def nextMessage(self
, msgFilter
=None):
151 """Return the next relevant message, or throw a NoMessageException."""
152 def is_relevant(msg
):
153 if not self
.isRemote(msg
):
156 if msgFilter
is None:
159 return msgFilter(msg
)
161 messages
= self
.context
['messages']
165 until
= time
.time() + timeout
168 # Fetch a new message from the messaging bus.
170 current
= time
.time()
174 self
.msg_ctx
.loop_once(until
- current
)
177 raise NoMessageException('timed out looking for a message')
179 # Grab the next message from the queue.
180 msg
= messages
.pop(0)
184 # Discard any previously queued messages.
186 def discardMessages(cls
):
187 messages
= cls
.context
["messages"]
192 # tevent presumably has other tasks to run, so we might need two or
193 # three loops before a message comes through.
195 cls
.msg_ctx
.loop_once(0.001)
198 # No new messages. We’ve probably got them all.
201 # Remove any NETLOGON authentication messages
202 # NETLOGON is only performed once per session, so to avoid ordering
203 # dependencies within the tests it's best to strip out NETLOGON messages.
205 def remove_netlogon_messages(self
, messages
):
206 def is_not_netlogon(msg
):
207 if "Authentication" not in msg
:
209 sd
= msg
["Authentication"]["serviceDescription"]
210 return sd
!= "NETLOGON"
212 return list(filter(is_not_netlogon
, messages
))
214 def is_guid(self
, guid
):
215 """Is the supplied GUID string correctly formatted"""
216 return string_is_guid(guid
)