Changed output and intermediate directories, tuned up compile parameters for Windows...
[pwlib.git] / src / ptclib / sockagg.cxx
blob17d676d7fcdab1da74e4bd7e58275eab50f3fd66
1 /*
2 * sockagg.cxx
4 * Generalised Socket Aggregation functions
6 * Portable Windows Library
8 * Copyright (C) 2005 Post Increment
10 * The contents of this file are subject to the Mozilla Public License
11 * Version 1.0 (the "License"); you may not use this file except in
12 * compliance with the License. You may obtain a copy of the License at
13 * http://www.mozilla.org/MPL/
15 * Software distributed under the License is distributed on an "AS IS"
16 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
17 * the License for the specific language governing rights and limitations
18 * under the License.
20 * The Original Code is Portable Windows Library.
22 * The Initial Developer of the Original Code is Post Increment
24 * Portions of this code were written with the financial assistance of
25 * Metreos Corporation (http://www.metros.com).
27 * Contributor(s): ______________________________________.
29 * $Log$
30 * Revision 1.19 2006/11/20 03:17:24 csoutheren
31 * Fixed incorrect check time on trace message
33 * Revision 1.18 2006/07/22 06:27:58 rjongbloed
34 * Added auo-load of Winsock v2 library required by agreggated sockets.
36 * Revision 1.17 2006/03/13 23:34:21 csoutheren
37 * Added log message when handle creates aggregator
39 * Revision 1.16 2006/03/09 05:32:59 csoutheren
40 * Reverted to conservative locking strategy, with OnClose
42 * Revision 1.15 2006/03/07 11:04:56 csoutheren
43 * Ensure socket aggregation not used on Linux
45 * Revision 1.14 2006/03/07 07:38:02 csoutheren
46 * Refine timing windows on socket handling and cleanup unused code
48 * Revision 1.13 2006/03/06 02:37:26 csoutheren
49 * Change handle locking to help prevent aggregation threads from hogging list
50 * access
52 * Revision 1.12 2006/03/02 07:50:38 csoutheren
53 * Cleanup unused code
54 * Add OnClose function
56 * Revision 1.11 2006/02/28 02:26:00 csoutheren
57 * Renamed variable to be not same as member
59 * Revision 1.10 2006/02/28 02:08:02 csoutheren
60 * Modified aggregation to load balance better
62 * Revision 1.9 2006/02/08 04:02:25 csoutheren
63 * Added ability to enable and disable socket aggregation
65 * Revision 1.8 2006/01/23 05:57:39 csoutheren
66 * More aggegator implementation
68 * Revision 1.7 2006/01/18 07:16:56 csoutheren
69 * Latest version of socket aggregation code
71 * Revision 1.6 2006/01/05 11:39:32 rjongbloed
72 * Fixed DevStudio warning
74 * Revision 1.5 2006/01/03 04:23:32 csoutheren
75 * Fixed Unix implementation
77 * Revision 1.4 2005/12/23 07:49:27 csoutheren
78 * Fixed Unix implementation
80 * Revision 1.3 2005/12/23 06:44:31 csoutheren
81 * Working implementation
83 * Revision 1.2 2005/12/22 07:27:36 csoutheren
84 * More implementation
86 * Revision 1.1 2005/12/22 03:55:52 csoutheren
87 * Added initial version of socket aggregation classes
91 #ifdef __GNUC__
92 #pragma implementation "sockagg.h"
93 #endif
96 #include <ptlib.h>
97 #include <ptclib/sockagg.h>
99 #include <fcntl.h>
101 ////////////////////////////////////////////////////////////////
103 #if _WIN32
105 #pragma comment(lib, "Ws2_32.lib")
107 class LocalEvent : public PHandleAggregator::EventBase
109 public:
110 LocalEvent()
112 event = CreateEvent(NULL, TRUE, FALSE,NULL);
113 PAssert(event != NULL, "CreateEvent failed");
116 ~LocalEvent()
117 { CloseHandle(event); }
119 PAggregatorFD::FD GetHandle()
120 { return event; }
122 void Set()
123 { SetEvent(event); }
125 void Reset()
126 { ResetEvent(event); }
128 protected:
129 HANDLE event;
132 PAggregatorFD::PAggregatorFD(SOCKET v)
133 : socket(v)
135 fd = WSACreateEvent();
136 PAssert(WSAEventSelect(socket, fd, FD_READ | FD_CLOSE) == 0, "WSAEventSelect failed");
139 PAggregatorFD::~PAggregatorFD()
141 WSACloseEvent(fd);
144 bool PAggregatorFD::IsValid()
146 return socket != INVALID_SOCKET;
149 #else // #if _WIN32
151 class LocalEvent : public PHandleAggregator::EventBase
153 public:
154 LocalEvent()
155 { ::pipe(fds); }
157 virtual ~LocalEvent()
159 close(fds[0]);
160 close(fds[1]);
163 PAggregatorFD::FD GetHandle()
164 { return fds[0]; }
166 void Set()
167 { char ch; write(fds[1], &ch, 1); }
169 void Reset()
170 { char ch; read(fds[0], &ch, 1); }
172 protected:
173 int fds[2];
176 PAggregatorFD::PAggregatorFD(int v)
177 : fd(v)
181 PAggregatorFD::~PAggregatorFD()
185 bool PAggregatorFD::IsValid()
187 return fd >= 0;
190 #endif // #endif _WIN32
193 ////////////////////////////////////////////////////////////////
195 PHandleAggregator::WorkerThreadBase::WorkerThreadBase(EventBase & _event)
196 : PThread(100, NoAutoDeleteThread, NormalPriority, "Aggregator:%0x"), event(_event), listChanged(TRUE), shutdown(FALSE)
200 class WorkerThread : public PHandleAggregator::WorkerThreadBase
202 public:
203 WorkerThread()
204 : WorkerThreadBase(localEvent)
207 ~WorkerThread()
211 void Trigger()
212 { localEvent.Set(); }
214 LocalEvent localEvent;
218 ////////////////////////////////////////////////////////////////
220 PHandleAggregator::PHandleAggregator(unsigned _max)
221 : maxWorkerSize(_max)
225 BOOL PHandleAggregator::AddHandle(PAggregatedHandle * handle)
227 // perform the handle init function
228 if (!handle->Init())
229 return FALSE;
231 PWaitAndSignal m(listMutex);
233 // if the maximum number of worker threads has been reached, then
234 // use the worker thread with the minimum number of handles
235 if (workers.size() >= maxWorkerSize) {
236 WorkerList_t::iterator minWorker = workers.end();
237 size_t minSizeFound = 0x7ffff;
238 WorkerList_t::iterator r;
239 for (r = workers.begin(); r != workers.end(); ++r) {
240 WorkerThreadBase & worker = **r;
241 PWaitAndSignal m2(worker.workerMutex);
242 if (!worker.shutdown && (worker.handleList.size() <= minSizeFound)) {
243 minSizeFound = worker.handleList.size();
244 minWorker = r;
248 // add the worker to the thread
249 PAssert(minWorker != workers.end(), "could not find minimum worker");
251 WorkerThreadBase & worker = **minWorker;
252 PWaitAndSignal m2(worker.workerMutex);
253 worker.handleList.push_back(handle);
254 PTRACE(4, "SockAgg\tAdding handle " << (void *)handle << " to aggregator - " << worker.handleList.size() << " handles");
255 worker.listChanged = TRUE;
256 worker.Trigger();
257 return TRUE;
260 PTRACE(4, "SockAgg\tCreating new aggregator for " << (void *)handle);
262 // no worker threads usable, create a new one
263 WorkerThread * worker = new WorkerThread;
264 worker->handleList.push_back(handle);
265 worker->Resume();
266 workers.push_back(worker);
268 PTRACE(4, "SockAgg\tAdding handle " << (void *)handle << " to new aggregator");
270 return TRUE;
273 BOOL PHandleAggregator::RemoveHandle(PAggregatedHandle * handle)
275 listMutex.Wait();
277 // look for the thread containing the handle we need to delete
278 WorkerList_t::iterator r;
279 for (r = workers.begin(); r != workers.end(); ++r) {
280 WorkerThreadBase * worker = *r;
282 // lock the worker
283 worker->workerMutex.Wait();
285 PAggregatedHandleList_t & hList = worker->handleList;
287 // if handle is not in this thread, then continue searching
288 PAggregatedHandleList_t::iterator s = find(hList.begin(), hList.end(), handle);
289 if (s == hList.end()) {
290 worker->workerMutex.Signal();
291 continue;
294 PAssert(*s == handle, "Found handle is not correct!");
296 PAssert(!handle->beingProcessed, "trying to close handle that is in use");
298 // remove the handle from the worker's list of handles
299 worker->handleList.erase(s);
301 // do the de-init action
302 handle->DeInit();
304 // delete the handle if autodelete enabled
305 if (handle->autoDelete)
306 delete handle;
308 // if the worker thread has enough handles to keep running, trigger it to update
309 if (worker->handleList.size() > 0) {
310 PTRACE(4, "SockAgg\tRemoved handle " << (void *)handle << " from aggregator - " << worker->handleList.size() << " handles remaining");
311 worker->listChanged = TRUE;
312 worker->Trigger();
313 worker->workerMutex.Signal();
315 listMutex.Signal();
317 return TRUE;
320 PTRACE(4, "SockAgg\tworker thread empty - closing down");
322 // remove the worker thread from the list of workers
323 workers.erase(r);
325 // shutdown the thread
326 worker->shutdown = TRUE;
327 worker->Trigger();
328 worker->workerMutex.Signal();
330 // unlock the list
331 listMutex.Signal();
333 // the worker is now finished
334 if (!worker->WaitForTermination(10000)) {
335 PTRACE(4, "SockAgg\tWorker did not terminate promptly");
338 delete worker;
340 return TRUE;
343 listMutex.Signal();
345 PAssertAlways("Cannot find aggregator handle");
347 return FALSE;
350 ////////////////////////////////////////////////////////////////
352 typedef std::vector<PAggregatorFD::FD> fdList_t;
354 #ifdef _WIN32
355 #define FDLIST_SIZE WSA_MAXIMUM_WAIT_EVENTS
356 #else
357 #define FDLIST_SIZE 64
358 #endif
360 void PHandleAggregator::WorkerThreadBase::Main()
362 PTRACE(4, "SockAgg\taggregator started");
364 fdList_t fdList;
365 PAggregatorFDList_t aggregatorFdList;
367 typedef std::map<PAggregatorFD::FD, PAggregatedHandle *> PAggregatorFdToHandleMap_t;
368 PAggregatorFdToHandleMap_t aggregatorFdToHandleMap;
370 for (;;) {
372 // create the list of fds to wait on and find minimum timeout
373 PTimeInterval timeout(PMaxTimeInterval);
374 PAggregatedHandle * timeoutHandle = NULL;
376 #ifndef _WIN32
377 fd_set rfds;
378 FD_ZERO(&rfds);
379 int maxFd = 0;
380 #endif
383 PWaitAndSignal m(workerMutex);
385 // check for shutdown
386 if (shutdown)
387 break;
389 // if the list of handles has changed, clear the list of handles
390 if (listChanged) {
391 aggregatorFdList.erase (aggregatorFdList.begin(), aggregatorFdList.end());
392 aggregatorFdList.reserve (FDLIST_SIZE);
393 fdList.erase (fdList.begin(), fdList.end());
394 fdList.reserve (FDLIST_SIZE);
395 aggregatorFdToHandleMap.erase(aggregatorFdToHandleMap.begin(), aggregatorFdToHandleMap.end());
398 PAggregatedHandleList_t::iterator r;
399 for (r = handleList.begin(); r != handleList.end(); ++r) {
400 PAggregatedHandle * handle = *r;
402 if (handle->closed)
403 continue;
405 if (listChanged) {
406 PAggregatorFDList_t fds = handle->GetFDs();
407 PAggregatorFDList_t::iterator s;
408 for (s = fds.begin(); s != fds.end(); ++s) {
409 fdList.push_back ((*s)->fd);
410 aggregatorFdList.push_back((*s));
411 aggregatorFdToHandleMap.insert(PAggregatorFdToHandleMap_t::value_type((*s)->fd, handle));
415 if (!handle->IsPreReadDone()) {
416 handle->PreRead();
417 handle->SetPreReadDone();
420 PTimeInterval t = handle->GetTimeout();
421 if (t < timeout) {
422 timeout = t;
423 timeoutHandle = handle;
427 // add in the event fd
428 if (listChanged) {
429 fdList.push_back(event.GetHandle());
430 listChanged = FALSE;
433 #ifndef _WIN32
434 // create the list of FDs
435 fdList_t::iterator s;
436 for (s = fdList.begin(); s != fdList.end(); ++s) {
437 FD_SET(*s, &rfds);
438 maxFd = PMAX(maxFd, *s);
440 #endif
441 } // workerMutex goes out of scope
443 #ifdef _WIN32
444 DWORD nCount = fdList.size();
445 DWORD ret = WSAWaitForMultipleEvents(nCount,
446 &fdList[0],
447 false,
448 (timeout == PMaxTimeInterval) ? WSA_INFINITE : (DWORD)timeout.GetMilliSeconds(),
449 FALSE);
451 if (ret == WAIT_FAILED) {
452 DWORD err = WSAGetLastError();
453 PTRACE(1, "SockAgg\tWSAWaitForMultipleEvents error " << err);
457 PWaitAndSignal m(workerMutex);
459 // check for shutdown
460 if (shutdown)
461 break;
463 if (ret == WAIT_TIMEOUT) {
464 // make sure the handle did not disappear while we were waiting
465 PAggregatedHandleList_t::iterator s = find(handleList.begin(), handleList.end(), timeoutHandle);
466 if (s == handleList.end()) {
467 PTRACE(4, "SockAgg\tHandle was removed while waiting");
469 else {
470 PTime start;
472 timeoutHandle->beingProcessed = TRUE;
473 timeoutHandle->closed = !timeoutHandle->OnRead();
474 timeoutHandle->beingProcessed = FALSE;
476 unsigned duration = (unsigned)(PTime() - start).GetMilliSeconds();
477 if (duration > 50) {
478 PTRACE(4, "SockAgg\tWarning - aggregator read routine was of extended duration = " << duration << " msecs");
480 if (!timeoutHandle->closed)
481 timeoutHandle->SetPreReadDone(FALSE);
485 else if (WAIT_OBJECT_0 <= ret && ret <= (WAIT_OBJECT_0 + nCount - 1)) {
486 DWORD index = ret - WAIT_OBJECT_0;
488 // if the event was triggered, redo the select
489 if (index == nCount-1) {
490 event.Reset();
491 continue;
494 PAggregatorFD * fd = aggregatorFdList[index];
495 PAssert(fdList[index] == fd->fd, "Mismatch in fd lists");
497 PAggregatorFdToHandleMap_t::iterator r = aggregatorFdToHandleMap.find(fd->fd);
498 if (r != aggregatorFdToHandleMap.end()) {
499 PAggregatedHandle * handle = r->second;
500 PAggregatedHandleList_t::iterator s = find(handleList.begin(), handleList.end(), handle);
501 if (s == handleList.end()) {
502 PTRACE(4, "SockAgg\tHandle was removed while waiting");
504 else {
505 WSANETWORKEVENTS events;
506 WSAEnumNetworkEvents(fd->socket, fd->fd, &events);
507 if (events.lNetworkEvents != 0) {
509 // check for read events first so we process any data that arrives before closing
510 if ((events.lNetworkEvents & FD_READ) != 0) {
512 PTime start;
514 handle->beingProcessed = TRUE;
515 handle->closed = !handle->OnRead();
516 handle->beingProcessed = FALSE;
518 unsigned duration = (unsigned)(PTime() - start).GetMilliSeconds();
519 PTRACE_IF(4, duration > 50, "SockAgg\tWarning - aggregator read routine was of extended duration = " << duration << " msecs");
522 // check for socket close
523 if ((events.lNetworkEvents & FD_CLOSE) != 0)
524 handle->closed = TRUE;
526 if (!handle->closed) {
527 // prepare for next read
528 handle->SetPreReadDone(FALSE);
529 } else {
530 handle->beingProcessed = TRUE;
531 handle->OnClose();
532 handle->beingProcessed = FALSE;
534 // make sure the list is refreshed without the closed socket
535 listChanged = TRUE;
541 } // workerMutex goes out of scope
543 #else
545 #error "aggregation not yet implemented on Unix"
547 #if 0
549 P_timeval pv = timeout;
550 int ret = ::select(maxFd+1, &rfds, NULL, NULL, pv);
552 if (ret < 0) {
553 PTRACE(1, "SockAgg\tSelect failed with error " << errno);
556 // loop again if nothing was ready
557 if (ret <= 0)
558 continue;
561 PWaitAndSignal m(workerMutex);
563 // check for shutdown
564 if (shutdown)
565 break;
567 if (ret == 0) {
568 PTime start;
569 BOOL closed = !timeoutHandle->OnRead();
570 unsigned duration = (unsigned)(PTime() - start).GetMilliSeconds();
571 if (duration > 50) {
572 PTRACE(4, "SockAgg\tWarning - aggregator read routine was of extended duration = " << duration << " msecs");
574 if (!closed)
575 timeoutHandle->SetPreReadDone(FALSE);
578 // check the event first
579 else if (FD_ISSET(event.GetHandle(), &rfds)) {
580 event.Reset();
581 continue;
584 else {
585 PAggregatorFD * fd = aggregatorFdList[ret];
586 PAssert(fdList[ret] == fd->fd, "Mismatch in fd lists");
587 PAggregatorFdToHandleMap_t::iterator r = aggregatorFdToHandleMap.find(fd->fd);
588 if (r != aggregatorFdToHandleMap.end()) {
589 PAggregatedHandle * handle = r->second;
590 PTime start;
591 BOOL closed = !handle->OnRead();
592 unsigned duration = (unsigned)(PTime() - start).GetMilliSeconds();
593 if (duration > 50) {
594 PTRACE(4, "SockAgg\tWarning - aggregator read routine was of extended duration = " << duration << " msecs");
596 if (!closed)
597 handle->SetPreReadDone(FALSE);
600 } // workerMutex goes out of scope
601 #endif // #if 0
602 #endif
605 PTRACE(4, "SockAgg\taggregator finished");