BPicture: Fix archive constructor.
[haiku.git] / src / add-ons / kernel / file_systems / nfs4 / RPCCallbackServer.cpp
blob0c448480bf8fdca6a931acc03f867141ddeb8150
1 /*
2 * Copyright 2012-2013 Haiku, Inc. All rights reserved.
3 * Distributed under the terms of the MIT License.
5 * Authors:
6 * Paweł Dziepak, pdziepak@quarnos.org
7 */
10 #include "RPCCallbackServer.h"
12 #include "NFS4Defs.h"
13 #include "RPCCallback.h"
14 #include "RPCCallbackReply.h"
15 #include "RPCCallbackRequest.h"
16 #include "RPCServer.h"
19 using namespace RPC;
22 CallbackServer* gRPCCallbackServer = NULL;
23 CallbackServer* gRPCCallbackServer6 = NULL;
26 CallbackServer::CallbackServer(int networkFamily)
28 fConnectionList(NULL),
29 fListener(NULL),
30 fThreadRunning(false),
31 fCallbackArray(NULL),
32 fArraySize(0),
33 fFreeSlot(-1),
34 fNetworkFamily(networkFamily)
36 mutex_init(&fConnectionLock, NULL);
37 mutex_init(&fThreadLock, NULL);
38 rw_lock_init(&fArrayLock, NULL);
42 CallbackServer::~CallbackServer()
44 StopServer();
46 free(fCallbackArray);
47 rw_lock_destroy(&fArrayLock);
48 mutex_destroy(&fThreadLock);
49 mutex_destroy(&fConnectionLock);
53 CallbackServer*
54 CallbackServer::Get(Server* server)
56 ASSERT(server != NULL);
58 int family = server->ID().Family();
59 ASSERT(family == AF_INET || family == AF_INET6);
61 int idx;
62 switch (family) {
63 case AF_INET:
64 idx = 0;
65 break;
66 case AF_INET6:
67 idx = 1;
68 break;
69 default:
70 return NULL;
73 MutexLocker _(fServerCreationLock);
74 if (fServers[idx] == NULL)
75 fServers[idx] = new CallbackServer(family);
76 return fServers[idx];
80 void
81 CallbackServer::ShutdownAll()
83 MutexLocker _(fServerCreationLock);
84 for (unsigned int i = 0; i < sizeof(fServers) / sizeof(fServers[0]); i++)
85 delete fServers[i];
86 memset(&fServers, 0, sizeof(fServers));
90 mutex CallbackServer::fServerCreationLock = MUTEX_INITIALIZER(NULL);
91 CallbackServer* CallbackServer::fServers[2] = { NULL, NULL };
94 status_t
95 CallbackServer::RegisterCallback(Callback* callback)
97 ASSERT(callback != NULL);
99 status_t result = StartServer();
100 if (result != B_OK)
101 return result;
103 WriteLocker _(fArrayLock);
104 if (fFreeSlot == -1) {
105 uint32 newSize = max_c(fArraySize * 2, 4);
106 uint32 size = newSize * sizeof(CallbackSlot);
107 CallbackSlot* array = reinterpret_cast<CallbackSlot*>(malloc(size));
108 if (array == NULL)
109 return B_NO_MEMORY;
111 if (fCallbackArray != NULL)
112 memcpy(array, fCallbackArray, fArraySize * sizeof(CallbackSlot));
114 for (uint32 i = fArraySize; i < newSize; i++)
115 array[i].fNext = i + 1;
117 array[newSize - 1].fNext = -1;
119 fCallbackArray = array;
120 fFreeSlot = fArraySize;
121 fArraySize = newSize;
124 int32 id = fFreeSlot;
125 fFreeSlot = fCallbackArray[id].fNext;
127 fCallbackArray[id].fCallback = callback;
128 callback->SetID(id);
129 callback->SetCBServer(this);
131 return B_OK;
135 status_t
136 CallbackServer::UnregisterCallback(Callback* callback)
138 ASSERT(callback != NULL);
139 ASSERT(callback->CBServer() == this);
141 int32 id = callback->ID();
143 WriteLocker _(fArrayLock);
144 fCallbackArray[id].fNext = fFreeSlot;
145 fFreeSlot = id;
147 callback->SetCBServer(NULL);
148 return B_OK;
152 status_t
153 CallbackServer::StartServer()
155 MutexLocker _(fThreadLock);
156 if (fThreadRunning)
157 return B_OK;
159 status_t result = ConnectionListener::Listen(&fListener, fNetworkFamily);
160 if (result != B_OK)
161 return result;
163 fThread = spawn_kernel_thread(&CallbackServer::ListenerThreadLauncher,
164 "NFSv4 Callback Listener", B_NORMAL_PRIORITY, this);
165 if (fThread < B_OK)
166 return fThread;
168 fThreadRunning = true;
170 result = resume_thread(fThread);
171 if (result != B_OK) {
172 kill_thread(fThread);
173 fThreadRunning = false;
174 return result;
177 return B_OK;
181 status_t
182 CallbackServer::StopServer()
184 MutexLocker _(&fThreadLock);
185 if (!fThreadRunning)
186 return B_OK;
188 fListener->Disconnect();
189 status_t result;
190 wait_for_thread(fThread, &result);
192 MutexLocker locker(fConnectionLock);
193 while (fConnectionList != NULL) {
194 ConnectionEntry* entry = fConnectionList;
195 fConnectionList = entry->fNext;
196 entry->fConnection->Disconnect();
198 status_t result;
199 wait_for_thread(entry->fThread, &result);
201 delete entry->fConnection;
202 delete entry;
205 delete fListener;
207 fThreadRunning = false;
208 return B_OK;
212 status_t
213 CallbackServer::NewConnection(Connection* connection)
215 ASSERT(connection != NULL);
217 ConnectionEntry* entry = new ConnectionEntry;
218 entry->fConnection = connection;
219 entry->fPrev = NULL;
221 MutexLocker locker(fConnectionLock);
222 entry->fNext = fConnectionList;
223 if (fConnectionList != NULL)
224 fConnectionList->fPrev = entry;
225 fConnectionList = entry;
226 locker.Unlock();
228 void** arguments = reinterpret_cast<void**>(malloc(sizeof(void*) * 2));
229 if (arguments == NULL)
230 return B_NO_MEMORY;
232 arguments[0] = this;
233 arguments[1] = entry;
235 thread_id thread;
236 thread = spawn_kernel_thread(&CallbackServer::ConnectionThreadLauncher,
237 "NFSv4 Callback Connection", B_NORMAL_PRIORITY, arguments);
238 if (thread < B_OK) {
239 ReleaseConnection(entry);
240 free(arguments);
241 return thread;
244 entry->fThread = thread;
246 status_t result = resume_thread(thread);
247 if (result != B_OK) {
248 kill_thread(thread);
249 ReleaseConnection(entry);
250 free(arguments);
251 return result;
254 return B_OK;
258 status_t
259 CallbackServer::ReleaseConnection(ConnectionEntry* entry)
261 ASSERT(entry != NULL);
263 MutexLocker _(fConnectionLock);
264 if (entry->fNext != NULL)
265 entry->fNext->fPrev = entry->fPrev;
266 if (entry->fPrev != NULL)
267 entry->fPrev->fNext = entry->fNext;
268 else
269 fConnectionList = entry->fNext;
271 delete entry->fConnection;
272 delete entry;
273 return B_OK;
277 status_t
278 CallbackServer::ConnectionThreadLauncher(void* object)
280 ASSERT(object != NULL);
282 void** objects = reinterpret_cast<void**>(object);
283 CallbackServer* server = reinterpret_cast<CallbackServer*>(objects[0]);
284 ConnectionEntry* entry = reinterpret_cast<ConnectionEntry*>(objects[1]);
285 free(objects);
287 return server->ConnectionThread(entry);
291 status_t
292 CallbackServer::ConnectionThread(ConnectionEntry* entry)
294 ASSERT(entry != NULL);
296 Connection* connection = entry->fConnection;
297 CallbackReply* reply;
299 while (fThreadRunning) {
300 uint32 size;
301 void* buffer;
302 status_t result = connection->Receive(&buffer, &size);
303 if (result != B_OK) {
304 if (result != ECONNABORTED)
305 ReleaseConnection(entry);
306 return result;
309 CallbackRequest* request
310 = new(std::nothrow) CallbackRequest(buffer, size);
311 if (request == NULL) {
312 free(buffer);
313 continue;
314 } else if (request->Error() != B_OK) {
315 reply = CallbackReply::Create(request->XID(), request->RPCError());
316 if (reply != NULL) {
317 connection->Send(reply->Stream().Buffer(),
318 reply->Stream().Size());
319 delete reply;
321 delete request;
322 continue;
325 switch (request->Procedure()) {
326 case CallbackProcCompound:
327 GetCallback(request->ID())->EnqueueRequest(request, connection);
328 break;
330 case CallbackProcNull:
331 reply = CallbackReply::Create(request->XID());
332 if (reply != NULL) {
333 connection->Send(reply->Stream().Buffer(),
334 reply->Stream().Size());
335 delete reply;
338 default:
339 delete request;
343 return B_OK;
347 status_t
348 CallbackServer::ListenerThreadLauncher(void* object)
350 ASSERT(object != NULL);
352 CallbackServer* server = reinterpret_cast<CallbackServer*>(object);
353 return server->ListenerThread();
357 status_t
358 CallbackServer::ListenerThread()
360 while (fThreadRunning) {
361 Connection* connection;
363 status_t result = fListener->AcceptConnection(&connection);
364 if (result != B_OK) {
365 fThreadRunning = false;
366 return result;
368 result = NewConnection(connection);
369 if (result != B_OK)
370 delete connection;
373 return B_OK;