2 * Copyright 2012 Haiku, Inc. All rights reserved.
3 * Distributed under the terms of the MIT License.
6 * Paweł Dziepak, pdziepak@quarnos.org
10 #include "RPCServer.h"
14 #include <util/AutoLock.h>
15 #include <util/Random.h>
17 #include "RPCCallbackServer.h"
24 RequestManager::RequestManager()
29 mutex_init(&fLock
, NULL
);
33 RequestManager::~RequestManager()
35 mutex_destroy(&fLock
);
40 RequestManager::AddRequest(Request
* request
)
42 ASSERT(request
!= NULL
);
45 if (fQueueTail
!= NULL
)
46 fQueueTail
->fNext
= request
;
50 request
->fNext
= NULL
;
55 RequestManager::FindRequest(uint32 xid
)
58 Request
* req
= fQueueHead
;
61 if (req
->fXID
== xid
) {
63 prev
->fNext
= req
->fNext
;
64 if (fQueueTail
== req
)
66 if (fQueueHead
== req
)
67 fQueueHead
= req
->fNext
;
80 Server::Server(Connection
* connection
, PeerAddress
* address
)
82 fConnection(connection
),
87 fXID(get_random
<uint32
>())
89 ASSERT(connection
!= NULL
);
90 ASSERT(address
!= NULL
);
92 mutex_init(&fCallbackLock
, NULL
);
93 mutex_init(&fRepairLock
, NULL
);
101 if (fCallback
!= NULL
)
102 fCallback
->CBServer()->UnregisterCallback(fCallback
);
104 mutex_destroy(&fCallbackLock
);
105 mutex_destroy(&fRepairLock
);
109 fThreadCancel
= true;
110 fConnection
->Disconnect();
113 wait_for_thread(fThread
, &result
);
120 Server::_StartListening()
122 fThreadCancel
= false;
124 fThread
= spawn_kernel_thread(&Server::_ListenerThreadStart
,
125 "NFSv4 Listener", B_NORMAL_PRIORITY
, this);
129 status_t result
= resume_thread(fThread
);
130 if (result
!= B_OK
) {
131 kill_thread(fThread
);
140 Server::SendCallAsync(Call
* call
, Reply
** reply
, Request
** request
)
142 ASSERT(call
!= NULL
);
143 ASSERT(reply
!= NULL
);
144 ASSERT(request
!= NULL
);
146 if (fThreadError
!= B_OK
&& Repair() != B_OK
)
149 Request
* req
= new(std::nothrow
) Request
;
153 uint32 xid
= _GetXID();
157 req
->fEvent
.Init(&req
->fEvent
, NULL
);
162 fRequests
.AddRequest(req
);
165 status_t error
= ResendCallAsync(call
, req
);
173 Server::ResendCallAsync(Call
* call
, Request
* request
)
175 ASSERT(call
!= NULL
);
176 ASSERT(request
!= NULL
);
178 if (fThreadError
!= B_OK
&& Repair() != B_OK
) {
179 fRequests
.FindRequest(request
->fXID
);
183 XDR::WriteStream
& stream
= call
->Stream();
184 status_t result
= fConnection
->Send(stream
.Buffer(), stream
.Size());
185 if (result
!= B_OK
) {
186 fRequests
.FindRequest(request
->fXID
);
195 Server::WakeCall(Request
* request
)
197 ASSERT(request
!= NULL
);
199 Request
* req
= fRequests
.FindRequest(request
->fXID
);
203 request
->fError
= B_FILE_ERROR
;
204 *request
->fReply
= NULL
;
205 request
->fDone
= true;
206 request
->fEvent
.NotifyAll();
215 uint32 thisRepair
= fRepairCount
;
217 MutexLocker
_(fRepairLock
);
218 if (fRepairCount
!= thisRepair
)
221 fThreadCancel
= true;
223 status_t result
= fConnection
->Reconnect();
227 wait_for_thread(fThread
, &result
);
228 result
= _StartListening();
238 Server::GetCallback()
240 MutexLocker
_(fCallbackLock
);
242 if (fCallback
== NULL
) {
243 fCallback
= new(std::nothrow
) Callback(this);
244 if (fCallback
== NULL
)
247 CallbackServer
* server
= CallbackServer::Get(this);
248 if (server
== NULL
) {
253 if (server
->RegisterCallback(fCallback
) != B_OK
) {
266 return static_cast<uint32
>(atomic_add(&fXID
, 1));
277 while (!fThreadCancel
) {
278 result
= fConnection
->Receive(&buffer
, &size
);
279 if (result
== B_NO_MEMORY
)
281 else if (result
!= B_OK
) {
282 fThreadError
= result
;
286 ASSERT(buffer
!= NULL
&& size
> 0);
287 Reply
* reply
= new(std::nothrow
) Reply(buffer
, size
);
293 Request
* req
= fRequests
.FindRequest(reply
->GetXID());
295 *req
->fReply
= reply
;
297 req
->fEvent
.NotifyAll();
307 Server::_ListenerThreadStart(void* object
)
309 ASSERT(object
!= NULL
);
311 Server
* server
= reinterpret_cast<Server
*>(object
);
312 return server
->_Listener();
316 ServerManager::ServerManager()
320 mutex_init(&fLock
, NULL
);
324 ServerManager::~ServerManager()
326 mutex_destroy(&fLock
);
331 ServerManager::Acquire(Server
** _server
, AddressResolver
* resolver
,
332 ProgramData
* (*createPrivateData
)(Server
*))
337 while ((result
= resolver
->GetNextAddress(&address
)) == B_OK
) {
338 result
= _Acquire(_server
, address
, createPrivateData
);
348 ServerManager::_Acquire(Server
** _server
, const PeerAddress
& address
,
349 ProgramData
* (*createPrivateData
)(Server
*))
351 ASSERT(_server
!= NULL
);
352 ASSERT(createPrivateData
!= NULL
);
356 MutexLocker
locker(fLock
);
357 ServerNode
* node
= _Find(address
);
360 *_server
= node
->fServer
;
365 node
= new(std::nothrow
) ServerNode
;
372 result
= Connection::Connect(&conn
, address
);
373 if (result
!= B_OK
) {
378 node
->fServer
= new Server(conn
, &node
->fID
);
379 if (node
->fServer
== NULL
) {
384 node
->fServer
->SetPrivateData(createPrivateData(node
->fServer
));
387 node
->fLeft
= node
->fRight
= NULL
;
389 ServerNode
* nd
= _Insert(node
);
393 delete node
->fServer
;
395 *_server
= nd
->fServer
;
399 *_server
= node
->fServer
;
405 ServerManager::Release(Server
* server
)
407 ASSERT(server
!= NULL
);
409 MutexLocker
_(fLock
);
410 ServerNode
* node
= _Find(server
->ID());
414 if (node
->fRefCount
== 0) {
416 delete node
->fServer
;
424 ServerManager::_Find(const PeerAddress
& address
)
426 ServerNode
* node
= fRoot
;
427 while (node
!= NULL
) {
428 if (node
->fID
== address
)
430 if (node
->fID
< address
)
441 ServerManager::_Delete(ServerNode
* node
)
443 ASSERT(node
!= NULL
);
446 ServerNode
* previous
= NULL
;
447 ServerNode
* current
= fRoot
;
448 while (current
!= NULL
) {
449 if (current
->fID
== node
->fID
) {
454 if (current
->fID
< node
->fID
) {
456 current
= current
->fRight
;
459 current
= current
->fLeft
;
466 if (previous
== NULL
)
468 else if (current
->fLeft
== NULL
&& current
->fRight
== NULL
) {
469 if (previous
->fID
< node
->fID
)
470 previous
->fRight
= NULL
;
472 previous
->fLeft
= NULL
;
473 } else if (current
->fLeft
!= NULL
&& current
->fRight
== NULL
) {
474 if (previous
->fID
< node
->fID
)
475 previous
->fRight
= current
->fLeft
;
477 previous
->fLeft
= current
->fLeft
;
478 } else if (current
->fLeft
== NULL
&& current
->fRight
!= NULL
) {
479 if (previous
->fID
< node
->fID
)
480 previous
->fRight
= current
->fRight
;
482 previous
->fLeft
= current
->fRight
;
484 ServerNode
* left_prev
= current
;
485 ServerNode
* left
= current
->fLeft
;
487 while (left
->fLeft
!= NULL
) {
492 if (previous
->fID
< node
->fID
)
493 previous
->fRight
= left
;
495 previous
->fLeft
= left
;
498 left_prev
->fLeft
= NULL
;
504 ServerManager::_Insert(ServerNode
* node
)
506 ASSERT(node
!= NULL
);
508 ServerNode
* previous
= NULL
;
509 ServerNode
* current
= fRoot
;
510 while (current
!= NULL
) {
511 if (current
->fID
== node
->fID
)
513 if (current
->fID
< node
->fID
) {
515 current
= current
->fRight
;
518 current
= current
->fLeft
;
522 if (previous
== NULL
)
524 else if (previous
->fID
< node
->fID
)
525 previous
->fRight
= node
;
527 previous
->fLeft
= node
;