2 * Copyright 2012-2013 Haiku, Inc. All rights reserved.
3 * Distributed under the terms of the MIT License.
6 * Paweł Dziepak, pdziepak@quarnos.org
10 #include "RPCCallbackServer.h"
13 #include "RPCCallback.h"
14 #include "RPCCallbackReply.h"
15 #include "RPCCallbackRequest.h"
16 #include "RPCServer.h"
22 CallbackServer
* gRPCCallbackServer
= NULL
;
23 CallbackServer
* gRPCCallbackServer6
= NULL
;
26 CallbackServer::CallbackServer(int networkFamily
)
28 fConnectionList(NULL
),
30 fThreadRunning(false),
34 fNetworkFamily(networkFamily
)
36 mutex_init(&fConnectionLock
, NULL
);
37 mutex_init(&fThreadLock
, NULL
);
38 rw_lock_init(&fArrayLock
, NULL
);
42 CallbackServer::~CallbackServer()
47 rw_lock_destroy(&fArrayLock
);
48 mutex_destroy(&fThreadLock
);
49 mutex_destroy(&fConnectionLock
);
54 CallbackServer::Get(Server
* server
)
56 ASSERT(server
!= NULL
);
58 int family
= server
->ID().Family();
59 ASSERT(family
== AF_INET
|| family
== AF_INET6
);
73 MutexLocker
_(fServerCreationLock
);
74 if (fServers
[idx
] == NULL
)
75 fServers
[idx
] = new CallbackServer(family
);
81 CallbackServer::ShutdownAll()
83 MutexLocker
_(fServerCreationLock
);
84 for (unsigned int i
= 0; i
< sizeof(fServers
) / sizeof(fServers
[0]); i
++)
86 memset(&fServers
, 0, sizeof(fServers
));
90 mutex
CallbackServer::fServerCreationLock
= MUTEX_INITIALIZER(NULL
);
91 CallbackServer
* CallbackServer::fServers
[2] = { NULL
, NULL
};
95 CallbackServer::RegisterCallback(Callback
* callback
)
97 ASSERT(callback
!= NULL
);
99 status_t result
= StartServer();
103 WriteLocker
_(fArrayLock
);
104 if (fFreeSlot
== -1) {
105 uint32 newSize
= max_c(fArraySize
* 2, 4);
106 uint32 size
= newSize
* sizeof(CallbackSlot
);
107 CallbackSlot
* array
= reinterpret_cast<CallbackSlot
*>(malloc(size
));
111 if (fCallbackArray
!= NULL
)
112 memcpy(array
, fCallbackArray
, fArraySize
* sizeof(CallbackSlot
));
114 for (uint32 i
= fArraySize
; i
< newSize
; i
++)
115 array
[i
].fNext
= i
+ 1;
117 array
[newSize
- 1].fNext
= -1;
119 fCallbackArray
= array
;
120 fFreeSlot
= fArraySize
;
121 fArraySize
= newSize
;
124 int32 id
= fFreeSlot
;
125 fFreeSlot
= fCallbackArray
[id
].fNext
;
127 fCallbackArray
[id
].fCallback
= callback
;
129 callback
->SetCBServer(this);
136 CallbackServer::UnregisterCallback(Callback
* callback
)
138 ASSERT(callback
!= NULL
);
139 ASSERT(callback
->CBServer() == this);
141 int32 id
= callback
->ID();
143 WriteLocker
_(fArrayLock
);
144 fCallbackArray
[id
].fNext
= fFreeSlot
;
147 callback
->SetCBServer(NULL
);
153 CallbackServer::StartServer()
155 MutexLocker
_(fThreadLock
);
159 status_t result
= ConnectionListener::Listen(&fListener
, fNetworkFamily
);
163 fThread
= spawn_kernel_thread(&CallbackServer::ListenerThreadLauncher
,
164 "NFSv4 Callback Listener", B_NORMAL_PRIORITY
, this);
168 fThreadRunning
= true;
170 result
= resume_thread(fThread
);
171 if (result
!= B_OK
) {
172 kill_thread(fThread
);
173 fThreadRunning
= false;
182 CallbackServer::StopServer()
184 MutexLocker
_(&fThreadLock
);
188 fListener
->Disconnect();
190 wait_for_thread(fThread
, &result
);
192 MutexLocker
locker(fConnectionLock
);
193 while (fConnectionList
!= NULL
) {
194 ConnectionEntry
* entry
= fConnectionList
;
195 fConnectionList
= entry
->fNext
;
196 entry
->fConnection
->Disconnect();
199 wait_for_thread(entry
->fThread
, &result
);
201 delete entry
->fConnection
;
207 fThreadRunning
= false;
213 CallbackServer::NewConnection(Connection
* connection
)
215 ASSERT(connection
!= NULL
);
217 ConnectionEntry
* entry
= new ConnectionEntry
;
218 entry
->fConnection
= connection
;
221 MutexLocker
locker(fConnectionLock
);
222 entry
->fNext
= fConnectionList
;
223 if (fConnectionList
!= NULL
)
224 fConnectionList
->fPrev
= entry
;
225 fConnectionList
= entry
;
228 void** arguments
= reinterpret_cast<void**>(malloc(sizeof(void*) * 2));
229 if (arguments
== NULL
)
233 arguments
[1] = entry
;
236 thread
= spawn_kernel_thread(&CallbackServer::ConnectionThreadLauncher
,
237 "NFSv4 Callback Connection", B_NORMAL_PRIORITY
, arguments
);
239 ReleaseConnection(entry
);
244 entry
->fThread
= thread
;
246 status_t result
= resume_thread(thread
);
247 if (result
!= B_OK
) {
249 ReleaseConnection(entry
);
259 CallbackServer::ReleaseConnection(ConnectionEntry
* entry
)
261 ASSERT(entry
!= NULL
);
263 MutexLocker
_(fConnectionLock
);
264 if (entry
->fNext
!= NULL
)
265 entry
->fNext
->fPrev
= entry
->fPrev
;
266 if (entry
->fPrev
!= NULL
)
267 entry
->fPrev
->fNext
= entry
->fNext
;
269 fConnectionList
= entry
->fNext
;
271 delete entry
->fConnection
;
278 CallbackServer::ConnectionThreadLauncher(void* object
)
280 ASSERT(object
!= NULL
);
282 void** objects
= reinterpret_cast<void**>(object
);
283 CallbackServer
* server
= reinterpret_cast<CallbackServer
*>(objects
[0]);
284 ConnectionEntry
* entry
= reinterpret_cast<ConnectionEntry
*>(objects
[1]);
287 return server
->ConnectionThread(entry
);
292 CallbackServer::ConnectionThread(ConnectionEntry
* entry
)
294 ASSERT(entry
!= NULL
);
296 Connection
* connection
= entry
->fConnection
;
297 CallbackReply
* reply
;
299 while (fThreadRunning
) {
302 status_t result
= connection
->Receive(&buffer
, &size
);
303 if (result
!= B_OK
) {
304 if (result
!= ECONNABORTED
)
305 ReleaseConnection(entry
);
309 CallbackRequest
* request
310 = new(std::nothrow
) CallbackRequest(buffer
, size
);
311 if (request
== NULL
) {
314 } else if (request
->Error() != B_OK
) {
315 reply
= CallbackReply::Create(request
->XID(), request
->RPCError());
317 connection
->Send(reply
->Stream().Buffer(),
318 reply
->Stream().Size());
325 switch (request
->Procedure()) {
326 case CallbackProcCompound
:
327 GetCallback(request
->ID())->EnqueueRequest(request
, connection
);
330 case CallbackProcNull
:
331 reply
= CallbackReply::Create(request
->XID());
333 connection
->Send(reply
->Stream().Buffer(),
334 reply
->Stream().Size());
348 CallbackServer::ListenerThreadLauncher(void* object
)
350 ASSERT(object
!= NULL
);
352 CallbackServer
* server
= reinterpret_cast<CallbackServer
*>(object
);
353 return server
->ListenerThread();
358 CallbackServer::ListenerThread()
360 while (fThreadRunning
) {
361 Connection
* connection
;
363 status_t result
= fListener
->AcceptConnection(&connection
);
364 if (result
!= B_OK
) {
365 fThreadRunning
= false;
368 result
= NewConnection(connection
);