Changed smtp for new worker classes
[fmail.git] / backends / queue / queueman.cpp
blob6849b2eaa8528ba4492f64c1499df45ce04dd772
1 /*
2 FancyMail: Queue Manager
4 Copyright (C) 2007 Carlos Daniel Ruvalcaba Valenzuela <clsdaniel@gmail.com>
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License along
17 with this program; if not, write to the Free Software Foundation, Inc.,
18 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 /* TODO: Add multiple Queues support */
23 #include <libfmail.h>
24 #include <libfmail/socketipc.h>
26 #include <pcrecpp.h>
28 #include <time.h>
30 int knhash(char v){
31 if ((v >= 'A') && (v <= 'Z'))
32 return v - 'A';
34 if (v == '_')
35 return 26;
37 if ((v >= 'a') && (v <= 'z'))
38 return v - 'a';
40 if (v == ' ')
41 return -1;
43 if (v == 0x0D)
44 return -1;
46 return 255;
49 class Subscriber {
50 std::string subs_uri;
51 public:
52 Subscriber(const char * uri){
53 this->subs_uri = uri;
56 void Alert(char * qname){
57 IPC * ipc;
58 IPCMessage * msg;
60 ipc = IPC::CreateIPC((char *)subs_uri.c_str());
61 ipc->RequestIPC();
63 msg = new IPCMessage("alert");
64 msg->PushParam(qname);
65 ipc->SendMessage(msg);
67 delete msg;
68 ipc->CloseIPC();
69 delete ipc;
73 /* TODO: Change to state based machine, add error handling */
74 class QueueHandler : public ProtocolHandler {
75 Semaphore qlock;/* Message Queue Semaphore*/
76 queue<string *> msg_queue;/* Message Queue */
77 Semaphore slock;/* Subscriber Queue Semaphore */
78 queue<Subscriber *> subs_queue;/* Subscribe Queue */
79 char path[100];
80 public:
81 QueueHandler(const char * qpath, const char * name){
82 /* Initialize Queue Path */
83 sprintf(path, "%s/%s/", qpath, name);
85 /* Initialize semaphore value to 1 */
86 qlock.Post();
89 int Handle(Socket * s){
90 char buffer[255], * queue_name;
91 int onrun, r, ret;
92 IPC * ipc;
93 IPCMessage * msg;
94 time_t ctime;
95 struct tm * gtime;
96 string * slot;
98 onrun = 1;
100 /* Start IPC in the socket */
101 ipc = new SocketIPC(s);
103 while (onrun){
104 /* Get our message */
105 msg = ipc->ReciveMessage();
107 if (msg == NULL){
108 onrun = 0;
109 break;
112 if (!strcmp(msg->GetMessageName(), "slot")){
113 /* TODO: check that queue exists */
114 /* We recived a slot request, get for which queue*/
115 queue_name = msg->PopParam();
116 free(queue_name);
118 /* Get current time*/
119 time(&ctime);
120 gtime = gmtime(&ctime);
122 /* Generate a Random number */
123 r = rand();
125 /* Construct a unique filename for the slot */
126 sprintf(buffer, "%s%i%i%i%i%i", path, gtime->tm_yday, gtime->tm_hour, gtime->tm_min, gtime->tm_sec, r);
128 /* Send positive response, with the new slot name */
129 msg = new IPCMessage("ok");
130 msg->PushParam(buffer);
131 ipc->SendMessage(msg);
132 delete msg;
134 } else if (!strcmp(msg->GetMessageName(), "push")){
135 /* Get the slot name */
136 slot = new string(msg->PopParam());
138 /* Decrement semaphore */
139 qlock.Wait();
141 /* Push the slot to the queue */
142 msg_queue.push(slot);
144 /* Increment semaphore */
145 qlock.Post();
147 delete msg;
148 } else if (!strcmp(msg->GetMessageName(), "pop")){
149 /* Decrement semaphore */
150 qlock.Wait();
152 /* Check if there is anything in the queue */
153 if (msg_queue.size() == 0){
154 ret = 0;
155 } else{
156 /* If there is messages in the queue pop them */
157 slot = msg_queue.front();
158 msg_queue.pop();
159 ret = 1;
162 /* Increment sempahore */
163 qlock.Post();
165 delete msg;
166 if (ret){
167 /* Send a positive response with the message filename */
168 msg = new IPCMessage("ok");
169 msg->PushParam((char *)slot->c_str());
170 } else{
171 /* There is no messages in the queue, return error */
172 msg = new IPCMessage("error");
174 ipc->SendMessage(msg);
175 delete msg;
176 } else if (!strcmp(msg->GetMessageName(), "quit")){
177 /* End Session */
178 onrun = 0;
179 delete msg;
180 } else if (!strcmp(msg->GetMessageName(), "subscribe")){
181 //Subscriber();
182 } else if (!strcmp(msg->GetMessageName(), "unsubscribe")){
185 s->Close();
187 return 0;
191 class SimpleLoad : public LoadHandler {
192 public:
193 int Dispatch(Socket * sock, ProtocolHandler * ph){
194 if (ph)
195 return ph->Handle(sock);
196 return 0;
200 int main(){
201 LoadHandler * lh;
202 QueueHandler * handler;
203 Socket * s, * cl;
204 int ret, port, tcount;
205 std::string conf_handler, qname, qpath;
206 Configuration conf("queue.conf");
208 /* Load configuration */
209 port = conf.getInt("port", 14001);
210 conf_handler = conf.getString("loadhandler", "simple");
212 /* Create socket and bind to QUEUE port */
213 s = Socket::CreateSocket(SOCKET_INET, 0);
214 s->setAddress("127.0.0.1");
215 s->setPort(port);
218 if (s->Bind()){
219 printf("Error: Unable to bind to port\n");
220 return 0;
223 s->Listen(15);
225 /* Read Queue configuration */
226 qname = conf.getString("queue.name", "incoming");
227 qpath = conf.getString("queue.path", "/tmp/fmail/queue");
229 /* Create our Queue handler and Threaded LoadHandler */
230 handler = new QueueHandler(qpath.c_str(), qname.c_str());
232 if (conf_handler == "thread"){
233 tcount = conf.getInt("thread.count", 10);
234 lh = new ThreadLoad(handler, tcount);
235 } else{
236 lh = new SimpleLoad();
239 /* TODO: Exit gracefuly, don't serve forever. */
240 while(1){
241 /* Wait for incoming connections */
242 ret = s->Poll(1000000, SOCKET_POLL_READ | SOCKET_POLL_ERROR);
244 if (ret & SOCKET_POLL_READ){
245 /* Accept client and dispatch */
246 cl = s->Accept();
247 lh->Dispatch(cl, handler);
248 } else if(ret & SOCKET_POLL_ERROR){ //error
249 return -1;
250 } else{ //timeout
251 break;
255 return 0;