1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "athena/system/device_socket_listener.h"
9 #include <sys/socket.h>
10 #include <sys/types.h>
13 #include "base/bind.h"
14 #include "base/files/file_path.h"
15 #include "base/message_loop/message_loop.h"
16 #include "base/observer_list.h"
17 #include "base/stl_util.h"
18 #include "ipc/unix_domain_socket_util.h"
24 typedef ObserverList
<DeviceSocketListener
> DeviceSocketListeners
;
26 // Reads from a device socket blocks of a particular size. When that amount of
27 // data is read DeviceSocketManager::OnDataAvailable is called on the singleton
28 // instance which then informs all of the listeners on that socket.
29 class DeviceSocketReader
: public base::MessagePumpLibevent::Watcher
{
31 DeviceSocketReader(const std::string
& socket_path
,
33 : socket_path_(socket_path
),
34 data_size_(data_size
),
35 data_(new char[data_size
]) {
37 virtual ~DeviceSocketReader() {}
40 // Overidden from base::MessagePumpLibevent::Watcher.
41 virtual void OnFileCanReadWithoutBlocking(int fd
) OVERRIDE
;
42 virtual void OnFileCanWriteWithoutBlocking(int fd
) OVERRIDE
;
44 std::string socket_path_
;
46 scoped_ptr
<char[]> data_
;
48 DISALLOW_COPY_AND_ASSIGN(DeviceSocketReader
);
51 class DeviceSocketManager
;
52 DeviceSocketManager
* device_socket_manager_instance_
= NULL
;
54 // A singleton instance for managing all connections to sockets.
55 class DeviceSocketManager
{
57 static void Create(scoped_refptr
<base::TaskRunner
> file_task_runner
) {
58 device_socket_manager_instance_
= new DeviceSocketManager(file_task_runner
);
61 static void Shutdown() {
62 CHECK(device_socket_manager_instance_
);
63 device_socket_manager_instance_
->ScheduleDelete();
64 // Once scheduled to be deleted, no-one should be
66 device_socket_manager_instance_
= NULL
;
69 static DeviceSocketManager
* GetInstanceUnsafe() {
70 return device_socket_manager_instance_
;
73 static DeviceSocketManager
* GetInstance() {
74 CHECK(device_socket_manager_instance_
);
75 return device_socket_manager_instance_
;
78 // If there isn't an existing connection to |socket_path|, then opens a
79 // connection to |socket_path| and starts listening for data. All listeners
80 // for |socket_path| receives data when data is available on the socket.
81 void StartListening(const std::string
& socket_path
,
83 DeviceSocketListener
* listener
);
85 // Removes |listener| from the list of listeners that receive data from
86 // |socket_path|. If this is the last listener, then this closes the
87 // connection to the socket.
88 void StopListening(const std::string
& socket_path
,
89 DeviceSocketListener
* listener
);
91 // Sends data to all the listeners registered to receive data from
93 void OnDataAvailable(const std::string
& socket_path
,
96 // Notifies listeners of errors reading from the socket and closes it.
97 void OnError(const std::string
& socket_path
, int err
);
98 void OnEOF(const std::string
& socket_path
);
107 DeviceSocketListeners observers
;
108 scoped_ptr
<base::MessagePumpLibevent::FileDescriptorWatcher
> controller
;
109 scoped_ptr
<DeviceSocketReader
> watcher
;
112 static void DeleteOnFILE(DeviceSocketManager
* manager
) { delete manager
; }
114 DeviceSocketManager(scoped_refptr
<base::TaskRunner
> file_task_runner
)
115 : file_task_runner_(file_task_runner
) {}
117 ~DeviceSocketManager() {
118 STLDeleteContainerPairSecondPointers(socket_data_
.begin(),
122 void ScheduleDelete();
124 void StartListeningOnFILE(const std::string
& socket_path
,
126 DeviceSocketListener
* listener
);
128 void StopListeningOnFILE(const std::string
& socket_path
,
129 DeviceSocketListener
* listener
);
131 void CloseSocket(const std::string
& socket_path
);
133 std::map
<std::string
, SocketData
*> socket_data_
;
134 scoped_refptr
<base::TaskRunner
> file_task_runner_
;
136 DISALLOW_COPY_AND_ASSIGN(DeviceSocketManager
);
139 ////////////////////////////////////////////////////////////////////////////////
140 // DeviceSocketReader
142 void DeviceSocketReader::OnFileCanReadWithoutBlocking(int fd
) {
143 ssize_t read_size
= recv(fd
, data_
.get(), data_size_
, 0);
147 DeviceSocketManager::GetInstance()->OnError(socket_path_
, errno
);
150 if (read_size
== 0) {
151 DeviceSocketManager::GetInstance()->OnEOF(socket_path_
);
154 if (read_size
!= static_cast<ssize_t
>(data_size_
))
156 DeviceSocketManager::GetInstance()->OnDataAvailable(socket_path_
,
160 void DeviceSocketReader::OnFileCanWriteWithoutBlocking(int fd
) {
164 ////////////////////////////////////////////////////////////////////////////////
165 // DeviceSocketManager
167 void DeviceSocketManager::StartListening(const std::string
& socket_path
,
169 DeviceSocketListener
* listener
) {
170 file_task_runner_
->PostTask(
172 base::Bind(&DeviceSocketManager::StartListeningOnFILE
,
173 base::Unretained(this),
179 void DeviceSocketManager::StopListening(const std::string
& socket_path
,
180 DeviceSocketListener
* listener
) {
181 file_task_runner_
->PostTask(
183 base::Bind(&DeviceSocketManager::StopListeningOnFILE
,
184 base::Unretained(this),
189 void DeviceSocketManager::OnDataAvailable(const std::string
& socket_path
,
191 CHECK_GT(socket_data_
.count(socket_path
), 0UL);
192 DeviceSocketListeners
& listeners
= socket_data_
[socket_path
]->observers
;
194 DeviceSocketListener
, listeners
, OnDataAvailableOnFILE(data
));
197 void DeviceSocketManager::CloseSocket(const std::string
& socket_path
) {
198 if (!socket_data_
.count(socket_path
))
200 SocketData
* socket_data
= socket_data_
[socket_path
];
201 close(socket_data
->fd
);
203 socket_data_
.erase(socket_path
);
206 void DeviceSocketManager::OnError(const std::string
& socket_path
, int err
) {
207 LOG(ERROR
) << "Error reading from socket: " << socket_path
<< ": "
209 CloseSocket(socket_path
);
210 // TODO(flackr): Notify listeners that the socket was closed unexpectedly.
213 void DeviceSocketManager::OnEOF(const std::string
& socket_path
) {
214 LOG(ERROR
) << "EOF reading from socket: " << socket_path
;
215 CloseSocket(socket_path
);
218 void DeviceSocketManager::StartListeningOnFILE(const std::string
& socket_path
,
220 DeviceSocketListener
* listener
) {
221 CHECK(file_task_runner_
->RunsTasksOnCurrentThread());
222 SocketData
* socket_data
= NULL
;
223 if (!socket_data_
.count(socket_path
)) {
225 if (!IPC::CreateClientUnixDomainSocket(base::FilePath(socket_path
),
227 LOG(ERROR
) << "Error connecting to socket: " << socket_path
;
231 socket_data
= new SocketData
;
232 socket_data_
[socket_path
] = socket_data
;
234 socket_data
->fd
= socket_fd
;
236 socket_data
->controller
.reset(
237 new base::MessagePumpLibevent::FileDescriptorWatcher());
238 socket_data
->watcher
.reset(
239 new DeviceSocketReader(socket_path
, data_size
));
241 base::MessageLoopForIO::current()->WatchFileDescriptor(
244 base::MessageLoopForIO::WATCH_READ
,
245 socket_data
->controller
.get(),
246 socket_data
->watcher
.get());
248 socket_data
= socket_data_
[socket_path
];
250 socket_data
->observers
.AddObserver(listener
);
253 void DeviceSocketManager::StopListeningOnFILE(const std::string
& socket_path
,
254 DeviceSocketListener
* listener
) {
255 if (!socket_data_
.count(socket_path
))
256 return; // Happens if unable to create a socket.
258 CHECK(file_task_runner_
->RunsTasksOnCurrentThread());
259 DeviceSocketListeners
& listeners
= socket_data_
[socket_path
]->observers
;
260 listeners
.RemoveObserver(listener
);
261 if (!listeners
.might_have_observers()) {
262 // All listeners for this socket has been removed. Close the socket.
263 CloseSocket(socket_path
);
267 void DeviceSocketManager::ScheduleDelete() {
268 // Schedule a task to delete on FILE thread because
269 // there may be a task scheduled on |file_task_runner_|.
270 file_task_runner_
->PostTask(
272 base::Bind(&DeleteOnFILE
, base::Unretained(this)));
277 DeviceSocketListener::DeviceSocketListener(const std::string
& socket_path
,
279 : socket_path_(socket_path
),
280 data_size_(data_size
) {
283 DeviceSocketListener::~DeviceSocketListener() {
288 void DeviceSocketListener::CreateSocketManager(
289 scoped_refptr
<base::TaskRunner
> file_task_runner
) {
290 DeviceSocketManager::Create(file_task_runner
);
294 void DeviceSocketListener::ShutdownSocketManager() {
295 DeviceSocketManager::Shutdown();
298 void DeviceSocketListener::StartListening() {
299 DeviceSocketManager::GetInstance()->StartListening(socket_path_
,
304 void DeviceSocketListener::StopListening() {
305 DeviceSocketManager
* instance
= DeviceSocketManager::GetInstanceUnsafe();
307 instance
->StopListening(socket_path_
, this);
310 } // namespace athena