Added signal handling to smtp, nailed many memory leaks on SearchTree, WorkerThread...
[fmail.git] / src / threadpool.cpp
bloba1b42a68b87f842a713e64daa5950124e2be5094
1 /*
2 libfmail: Threaded Load handler with Thread Pool
4 Copyright (C) 2007 Carlos Daniel Ruvalcaba Valenzuela <clsdaniel@gmail.com>
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License along
17 with this program; if not, write to the Free Software Foundation, Inc.,
18 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 #include <libfmail.h>
23 /* Worker Thread, has a sempahore for signaling */
24 class ThreadLoadHandler : public Thread, public Semaphore {
25 private:
26 Socket * s;
27 ProtocolHandler * ph;
28 public:
29 int onRun;
30 ThreadLoadHandler(ProtocolHandler * pht) : Thread(), Semaphore(){
31 s = NULL;
32 ph = pht;
33 onRun = 1;
36 /* Set client socket */
37 void setSocket(Socket * sock){
38 s = sock;
41 /* Deprecated */
42 int isReady(){
43 return 0;
46 /* Thread code */
47 int Run(){
48 while (onRun){
49 /* Wait for a job, decrement semaphore */
50 Wait();
52 if (s){
53 /* Run the handler */
54 ph->Handle(s);
57 s = NULL;
59 /* Decrement the semaphore */
60 Wait();
62 return 0;
66 ThreadLoad::ThreadLoad(ProtocolHandler * ph, int tpoolSize){
67 int i;
68 poolSize = tpoolSize;
69 ThreadLoadHandler * * tpool;
71 /* Allocate an array for the worker threads */
72 pool = (void * *)malloc(sizeof(ThreadLoadHandler*) * poolSize);
73 tpool = (ThreadLoadHandler * *)pool;
75 /* Create and start worker threads, all should just wait for job */
76 for (i = 0; i < poolSize; i++){
77 tpool[i] = new ThreadLoadHandler(ph);
78 tpool[i]->Start();
82 ThreadLoad::~ThreadLoad(){
83 int unclean, i;
84 ThreadLoadHandler * * tpool;
86 tpool = (ThreadLoadHandler * *)pool;
88 unclean = 1;
89 while(unclean){
90 unclean = 0;
91 for(i = 0; i < poolSize; i++){
92 if (tpool[i]->onRun){
93 if (tpool[i]->isReady()){
94 tpool[i]->onRun = 0;
95 tpool[i]->Post();
96 } else{
97 unclean = 1;
103 for(i = 0; i < poolSize; i++){
104 delete tpool[i];
106 free(pool);
107 pool = NULL;
110 /* Boss thread handling code */
111 int ThreadLoad::Dispatch(Socket * sock, ProtocolHandler * ph){
112 int i, dispatched;
113 ThreadLoadHandler * * tpool;
115 (void)ph;
116 tpool = (ThreadLoadHandler * *)pool;
118 dispatched = 0;
119 i = 0;
121 while (dispatched == 0){
122 /* Check semaphore value, if 0 the thread is waiting for work */
123 //tpool[i]->getValue(&v);
124 //if (v < 1){
125 if (tpool[i]->TryWait() == -1){
126 /* Handle the socket to the thread (work) */
127 //printf("Dispatching to thread %i\n", i);
128 tpool[i]->setSocket(sock);
130 /* Increment twice the semaphore, this to prevent
131 * handling work to a thread already doing something but
132 * with semaphore value of 0 */
133 tpool[i]->Post();
134 tpool[i]->Post();
135 tpool[i]->Post();
136 dispatched = 1;
137 } else{
138 tpool[i]->Post();
140 i++;
141 /* Wrap i around pool size */
142 if (i == poolSize)
143 i = 0;
145 return 0;