4 * Generalised Socket Aggregation functions
6 * Portable Windows Library
8 * Copyright (C) 2005 Post Increment
10 * The contents of this file are subject to the Mozilla Public License
11 * Version 1.0 (the "License"); you may not use this file except in
12 * compliance with the License. You may obtain a copy of the License at
13 * http://www.mozilla.org/MPL/
15 * Software distributed under the License is distributed on an "AS IS"
16 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
17 * the License for the specific language governing rights and limitations
20 * The Original Code is Portable Windows Library.
22 * The Initial Developer of the Original Code is Post Increment
24 * Portions of this code were written with the financial assistance of
25 * Metreos Corporation (http://www.metros.com).
27 * Contributor(s): ______________________________________.
30 * Revision 1.19 2006/11/20 03:17:24 csoutheren
31 * Fixed incorrect check time on trace message
33 * Revision 1.18 2006/07/22 06:27:58 rjongbloed
34 * Added auo-load of Winsock v2 library required by agreggated sockets.
36 * Revision 1.17 2006/03/13 23:34:21 csoutheren
37 * Added log message when handle creates aggregator
39 * Revision 1.16 2006/03/09 05:32:59 csoutheren
40 * Reverted to conservative locking strategy, with OnClose
42 * Revision 1.15 2006/03/07 11:04:56 csoutheren
43 * Ensure socket aggregation not used on Linux
45 * Revision 1.14 2006/03/07 07:38:02 csoutheren
46 * Refine timing windows on socket handling and cleanup unused code
48 * Revision 1.13 2006/03/06 02:37:26 csoutheren
49 * Change handle locking to help prevent aggregation threads from hogging list
52 * Revision 1.12 2006/03/02 07:50:38 csoutheren
54 * Add OnClose function
56 * Revision 1.11 2006/02/28 02:26:00 csoutheren
57 * Renamed variable to be not same as member
59 * Revision 1.10 2006/02/28 02:08:02 csoutheren
60 * Modified aggregation to load balance better
62 * Revision 1.9 2006/02/08 04:02:25 csoutheren
63 * Added ability to enable and disable socket aggregation
65 * Revision 1.8 2006/01/23 05:57:39 csoutheren
66 * More aggegator implementation
68 * Revision 1.7 2006/01/18 07:16:56 csoutheren
69 * Latest version of socket aggregation code
71 * Revision 1.6 2006/01/05 11:39:32 rjongbloed
72 * Fixed DevStudio warning
74 * Revision 1.5 2006/01/03 04:23:32 csoutheren
75 * Fixed Unix implementation
77 * Revision 1.4 2005/12/23 07:49:27 csoutheren
78 * Fixed Unix implementation
80 * Revision 1.3 2005/12/23 06:44:31 csoutheren
81 * Working implementation
83 * Revision 1.2 2005/12/22 07:27:36 csoutheren
86 * Revision 1.1 2005/12/22 03:55:52 csoutheren
87 * Added initial version of socket aggregation classes
92 #pragma implementation "sockagg.h"
97 #include <ptclib/sockagg.h>
101 ////////////////////////////////////////////////////////////////
105 #pragma comment(lib, "Ws2_32.lib")
107 class LocalEvent
: public PHandleAggregator::EventBase
112 event
= CreateEvent(NULL
, TRUE
, FALSE
,NULL
);
113 PAssert(event
!= NULL
, "CreateEvent failed");
117 { CloseHandle(event
); }
119 PAggregatorFD::FD
GetHandle()
126 { ResetEvent(event
); }
132 PAggregatorFD::PAggregatorFD(SOCKET v
)
135 fd
= WSACreateEvent();
136 PAssert(WSAEventSelect(socket
, fd
, FD_READ
| FD_CLOSE
) == 0, "WSAEventSelect failed");
139 PAggregatorFD::~PAggregatorFD()
144 bool PAggregatorFD::IsValid()
146 return socket
!= INVALID_SOCKET
;
151 class LocalEvent
: public PHandleAggregator::EventBase
157 virtual ~LocalEvent()
163 PAggregatorFD::FD
GetHandle()
167 { char ch
; write(fds
[1], &ch
, 1); }
170 { char ch
; read(fds
[0], &ch
, 1); }
176 PAggregatorFD::PAggregatorFD(int v
)
181 PAggregatorFD::~PAggregatorFD()
185 bool PAggregatorFD::IsValid()
190 #endif // #endif _WIN32
193 ////////////////////////////////////////////////////////////////
195 PHandleAggregator::WorkerThreadBase::WorkerThreadBase(EventBase
& _event
)
196 : PThread(100, NoAutoDeleteThread
, NormalPriority
, "Aggregator:%0x"), event(_event
), listChanged(TRUE
), shutdown(FALSE
)
200 class WorkerThread
: public PHandleAggregator::WorkerThreadBase
204 : WorkerThreadBase(localEvent
)
212 { localEvent
.Set(); }
214 LocalEvent localEvent
;
218 ////////////////////////////////////////////////////////////////
220 PHandleAggregator::PHandleAggregator(unsigned _max
)
221 : maxWorkerSize(_max
)
225 BOOL
PHandleAggregator::AddHandle(PAggregatedHandle
* handle
)
227 // perform the handle init function
231 PWaitAndSignal
m(listMutex
);
233 // if the maximum number of worker threads has been reached, then
234 // use the worker thread with the minimum number of handles
235 if (workers
.size() >= maxWorkerSize
) {
236 WorkerList_t::iterator minWorker
= workers
.end();
237 size_t minSizeFound
= 0x7ffff;
238 WorkerList_t::iterator r
;
239 for (r
= workers
.begin(); r
!= workers
.end(); ++r
) {
240 WorkerThreadBase
& worker
= **r
;
241 PWaitAndSignal
m2(worker
.workerMutex
);
242 if (!worker
.shutdown
&& (worker
.handleList
.size() <= minSizeFound
)) {
243 minSizeFound
= worker
.handleList
.size();
248 // add the worker to the thread
249 PAssert(minWorker
!= workers
.end(), "could not find minimum worker");
251 WorkerThreadBase
& worker
= **minWorker
;
252 PWaitAndSignal
m2(worker
.workerMutex
);
253 worker
.handleList
.push_back(handle
);
254 PTRACE(4, "SockAgg\tAdding handle " << (void *)handle
<< " to aggregator - " << worker
.handleList
.size() << " handles");
255 worker
.listChanged
= TRUE
;
260 PTRACE(4, "SockAgg\tCreating new aggregator for " << (void *)handle
);
262 // no worker threads usable, create a new one
263 WorkerThread
* worker
= new WorkerThread
;
264 worker
->handleList
.push_back(handle
);
266 workers
.push_back(worker
);
268 PTRACE(4, "SockAgg\tAdding handle " << (void *)handle
<< " to new aggregator");
273 BOOL
PHandleAggregator::RemoveHandle(PAggregatedHandle
* handle
)
277 // look for the thread containing the handle we need to delete
278 WorkerList_t::iterator r
;
279 for (r
= workers
.begin(); r
!= workers
.end(); ++r
) {
280 WorkerThreadBase
* worker
= *r
;
283 worker
->workerMutex
.Wait();
285 PAggregatedHandleList_t
& hList
= worker
->handleList
;
287 // if handle is not in this thread, then continue searching
288 PAggregatedHandleList_t::iterator s
= find(hList
.begin(), hList
.end(), handle
);
289 if (s
== hList
.end()) {
290 worker
->workerMutex
.Signal();
294 PAssert(*s
== handle
, "Found handle is not correct!");
296 PAssert(!handle
->beingProcessed
, "trying to close handle that is in use");
298 // remove the handle from the worker's list of handles
299 worker
->handleList
.erase(s
);
301 // do the de-init action
304 // delete the handle if autodelete enabled
305 if (handle
->autoDelete
)
308 // if the worker thread has enough handles to keep running, trigger it to update
309 if (worker
->handleList
.size() > 0) {
310 PTRACE(4, "SockAgg\tRemoved handle " << (void *)handle
<< " from aggregator - " << worker
->handleList
.size() << " handles remaining");
311 worker
->listChanged
= TRUE
;
313 worker
->workerMutex
.Signal();
320 PTRACE(4, "SockAgg\tworker thread empty - closing down");
322 // remove the worker thread from the list of workers
325 // shutdown the thread
326 worker
->shutdown
= TRUE
;
328 worker
->workerMutex
.Signal();
333 // the worker is now finished
334 if (!worker
->WaitForTermination(10000)) {
335 PTRACE(4, "SockAgg\tWorker did not terminate promptly");
345 PAssertAlways("Cannot find aggregator handle");
350 ////////////////////////////////////////////////////////////////
352 typedef std::vector
<PAggregatorFD::FD
> fdList_t
;
355 #define FDLIST_SIZE WSA_MAXIMUM_WAIT_EVENTS
357 #define FDLIST_SIZE 64
360 void PHandleAggregator::WorkerThreadBase::Main()
362 PTRACE(4, "SockAgg\taggregator started");
365 PAggregatorFDList_t aggregatorFdList
;
367 typedef std::map
<PAggregatorFD::FD
, PAggregatedHandle
*> PAggregatorFdToHandleMap_t
;
368 PAggregatorFdToHandleMap_t aggregatorFdToHandleMap
;
372 // create the list of fds to wait on and find minimum timeout
373 PTimeInterval
timeout(PMaxTimeInterval
);
374 PAggregatedHandle
* timeoutHandle
= NULL
;
383 PWaitAndSignal
m(workerMutex
);
385 // check for shutdown
389 // if the list of handles has changed, clear the list of handles
391 aggregatorFdList
.erase (aggregatorFdList
.begin(), aggregatorFdList
.end());
392 aggregatorFdList
.reserve (FDLIST_SIZE
);
393 fdList
.erase (fdList
.begin(), fdList
.end());
394 fdList
.reserve (FDLIST_SIZE
);
395 aggregatorFdToHandleMap
.erase(aggregatorFdToHandleMap
.begin(), aggregatorFdToHandleMap
.end());
398 PAggregatedHandleList_t::iterator r
;
399 for (r
= handleList
.begin(); r
!= handleList
.end(); ++r
) {
400 PAggregatedHandle
* handle
= *r
;
406 PAggregatorFDList_t fds
= handle
->GetFDs();
407 PAggregatorFDList_t::iterator s
;
408 for (s
= fds
.begin(); s
!= fds
.end(); ++s
) {
409 fdList
.push_back ((*s
)->fd
);
410 aggregatorFdList
.push_back((*s
));
411 aggregatorFdToHandleMap
.insert(PAggregatorFdToHandleMap_t::value_type((*s
)->fd
, handle
));
415 if (!handle
->IsPreReadDone()) {
417 handle
->SetPreReadDone();
420 PTimeInterval t
= handle
->GetTimeout();
423 timeoutHandle
= handle
;
427 // add in the event fd
429 fdList
.push_back(event
.GetHandle());
434 // create the list of FDs
435 fdList_t::iterator s
;
436 for (s
= fdList
.begin(); s
!= fdList
.end(); ++s
) {
438 maxFd
= PMAX(maxFd
, *s
);
441 } // workerMutex goes out of scope
444 DWORD nCount
= fdList
.size();
445 DWORD ret
= WSAWaitForMultipleEvents(nCount
,
448 (timeout
== PMaxTimeInterval
) ? WSA_INFINITE
: (DWORD
)timeout
.GetMilliSeconds(),
451 if (ret
== WAIT_FAILED
) {
452 DWORD err
= WSAGetLastError();
453 PTRACE(1, "SockAgg\tWSAWaitForMultipleEvents error " << err
);
457 PWaitAndSignal
m(workerMutex
);
459 // check for shutdown
463 if (ret
== WAIT_TIMEOUT
) {
464 // make sure the handle did not disappear while we were waiting
465 PAggregatedHandleList_t::iterator s
= find(handleList
.begin(), handleList
.end(), timeoutHandle
);
466 if (s
== handleList
.end()) {
467 PTRACE(4, "SockAgg\tHandle was removed while waiting");
472 timeoutHandle
->beingProcessed
= TRUE
;
473 timeoutHandle
->closed
= !timeoutHandle
->OnRead();
474 timeoutHandle
->beingProcessed
= FALSE
;
476 unsigned duration
= (unsigned)(PTime() - start
).GetMilliSeconds();
478 PTRACE(4, "SockAgg\tWarning - aggregator read routine was of extended duration = " << duration
<< " msecs");
480 if (!timeoutHandle
->closed
)
481 timeoutHandle
->SetPreReadDone(FALSE
);
485 else if (WAIT_OBJECT_0
<= ret
&& ret
<= (WAIT_OBJECT_0
+ nCount
- 1)) {
486 DWORD index
= ret
- WAIT_OBJECT_0
;
488 // if the event was triggered, redo the select
489 if (index
== nCount
-1) {
494 PAggregatorFD
* fd
= aggregatorFdList
[index
];
495 PAssert(fdList
[index
] == fd
->fd
, "Mismatch in fd lists");
497 PAggregatorFdToHandleMap_t::iterator r
= aggregatorFdToHandleMap
.find(fd
->fd
);
498 if (r
!= aggregatorFdToHandleMap
.end()) {
499 PAggregatedHandle
* handle
= r
->second
;
500 PAggregatedHandleList_t::iterator s
= find(handleList
.begin(), handleList
.end(), handle
);
501 if (s
== handleList
.end()) {
502 PTRACE(4, "SockAgg\tHandle was removed while waiting");
505 WSANETWORKEVENTS events
;
506 WSAEnumNetworkEvents(fd
->socket
, fd
->fd
, &events
);
507 if (events
.lNetworkEvents
!= 0) {
509 // check for read events first so we process any data that arrives before closing
510 if ((events
.lNetworkEvents
& FD_READ
) != 0) {
514 handle
->beingProcessed
= TRUE
;
515 handle
->closed
= !handle
->OnRead();
516 handle
->beingProcessed
= FALSE
;
518 unsigned duration
= (unsigned)(PTime() - start
).GetMilliSeconds();
519 PTRACE_IF(4, duration
> 50, "SockAgg\tWarning - aggregator read routine was of extended duration = " << duration
<< " msecs");
522 // check for socket close
523 if ((events
.lNetworkEvents
& FD_CLOSE
) != 0)
524 handle
->closed
= TRUE
;
526 if (!handle
->closed
) {
527 // prepare for next read
528 handle
->SetPreReadDone(FALSE
);
530 handle
->beingProcessed
= TRUE
;
532 handle
->beingProcessed
= FALSE
;
534 // make sure the list is refreshed without the closed socket
541 } // workerMutex goes out of scope
545 #error "aggregation not yet implemented on Unix"
549 P_timeval pv
= timeout
;
550 int ret
= ::select(maxFd
+1, &rfds
, NULL
, NULL
, pv
);
553 PTRACE(1, "SockAgg\tSelect failed with error " << errno
);
556 // loop again if nothing was ready
561 PWaitAndSignal
m(workerMutex
);
563 // check for shutdown
569 BOOL closed
= !timeoutHandle
->OnRead();
570 unsigned duration
= (unsigned)(PTime() - start
).GetMilliSeconds();
572 PTRACE(4, "SockAgg\tWarning - aggregator read routine was of extended duration = " << duration
<< " msecs");
575 timeoutHandle
->SetPreReadDone(FALSE
);
578 // check the event first
579 else if (FD_ISSET(event
.GetHandle(), &rfds
)) {
585 PAggregatorFD
* fd
= aggregatorFdList
[ret
];
586 PAssert(fdList
[ret
] == fd
->fd
, "Mismatch in fd lists");
587 PAggregatorFdToHandleMap_t::iterator r
= aggregatorFdToHandleMap
.find(fd
->fd
);
588 if (r
!= aggregatorFdToHandleMap
.end()) {
589 PAggregatedHandle
* handle
= r
->second
;
591 BOOL closed
= !handle
->OnRead();
592 unsigned duration
= (unsigned)(PTime() - start
).GetMilliSeconds();
594 PTRACE(4, "SockAgg\tWarning - aggregator read routine was of extended duration = " << duration
<< " msecs");
597 handle
->SetPreReadDone(FALSE
);
600 } // workerMutex goes out of scope
605 PTRACE(4, "SockAgg\taggregator finished");